File: //proc/self/root/opt/alt/python37/lib/python3.7/site-packages/cl_dom_collector/dom_collector.py
# -*- coding: utf-8 -*-
#
# Copyright CloudLinux Zug GmbH 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
# pylint: disable=no-absolute-import
import sys
import os
import base64
import traceback
import lvectllib
import subprocess
from cllimits import LveCtl
from clcommon.cpapi.cpapiexceptions import EncodingError
from clcommon.clpwd import ClPwd
from clcommon.cpapi import userdomains
class DomainCollector:
"""
Class for print panel users domain docroots.
See LU-1751 for details
"""
def __init__(self):
self.is_not_in_lve = not bool(os.environ.get('DOM_COLL_RUNNING_IN_LVE'))
self._lvectl = LveCtl()
self._clpwd = ClPwd()
def _get_panel_user_names_list(self):
"""
Get panel user names list
:return: user names list
"""
try:
panel_uids_list = self._lvectl.get_panel_users_uid_list()
except EncodingError:
var = traceback.format_exc()
print('domains_collector error: {}'.format(var), file=sys.stderr)
sys.exit(1)
panel_users_list = []
for uid in panel_uids_list:
try:
name = self._clpwd.get_names(uid)[0]
panel_users_list.append(name)
except ClPwd.NoSuchUserException:
pass
return panel_users_list
def _print_users_domains_in_base64(self):
"""
Prints users domains in base64 to stdout
"""
panel_users_list = self._get_panel_user_names_list()
docroot_list = []
for username in panel_users_list:
try:
for _, domain_docroot in userdomains(username):
docroot_list.append(domain_docroot)
except Exception:
# userdomains has very various implementations in cpapi plugins for different panels,
# so we catch common Exception instead separate exceptions
pass
for docroot in docroot_list:
print(base64.b64encode(docroot.encode('utf-8')).decode('utf-8'))
def run(self):
if self.is_not_in_lve:
try:
rc = self._run_self_in_lve()
exit(rc)
except lvectllib.PyLveError as e:
error_msg = 'failed to run task in lve, error: %s' % e
print(error_msg)
exit(-1)
else:
# We in LVE, collect docroots list
self._print_users_domains_in_base64()
# TODO: we need this bicycle because method pylve.lve_enter_pid does not work properly (surprise!)
# when we call lve_enter_pid, lve limits process only by cpu usage, other parameters are unlimited
@staticmethod
def _run_self_in_lve():
"""
Run same command in lve and set environ RUNNING_IN_LVE=True
in order to check it in child process.
:return:
"""
settings = lvectllib.make_liblve_settings(
ls_cpu=15, # 15 percents of CPU (NOT core)
ls_cpus=0,
ls_memory_phy=1024 * 1024 ** 2 # 1gb
)
with lvectllib.temporary_lve(settings) as lve_id:
return subprocess.call(
['/bin/lve_suwrapper', '-n', str(lve_id), '/usr/bin/cloudlinux_domains_collector'],
env=dict(os.environ, DOM_COLL_RUNNING_IN_LVE='1'))