Edit File: mod_security.py
import logging import os import shutil import textwrap import zipfile import yaml from abc import ABC, abstractclassmethod from contextlib import suppress from functools import lru_cache from pathlib import Path from typing import Dict, List, Optional from urllib.parse import urlparse from async_lru import alru_cache from defence360agent.contracts.config import ConfigFile from defence360agent.subsys.panels.base import ( ModsecVendorsError, forbid_dns_only, ) from defence360agent.subsys.panels.cpanel.whm import ( WHMAPIException, catch_exception, whmapi1, ) from defence360agent.utils import ( CheckRunError, async_lru_cache, atomic_rewrite, check_run, nice_iterator, ) from im360.contracts.config import ModSecurityDirectives from im360.subsys.panels.base import ( MODSEC_NAME_TEMPLATE, FilesVendor, FilesVendorList, ModSecSettingInterface, ModSecurityInterface, skip_if_not_installed_modsec, APACHE, LITESPEED, ) #: path to paths.conf file from the ea-apache24-config-runtime package EA4_PATHS_CONF_PATH = Path("/etc/cpanel/ea4/paths.conf") #: presence of the file indicates it is EasyApache 4 IS_EA4_PATH = Path("/etc/cpanel/ea4/is_ea4") #: path to modsec_vendor script MODSEC_VENDOR_BIN = "/usr/local/cpanel/scripts/modsec_vendor" NON_CONFLICTING_RULESETS = ( "comodo_apache", "comodo_litespeed", "imunify360_rules", "configserver", ) MODSEC_VENDOR_VAR_DIR = Path("/var/cpanel/modsec_vendors") logger = logging.getLogger(__name__) async def _modsec_vendor_cmd(cmd, param): """ :raise subprocess.CalledProcessError: """ await check_run([MODSEC_VENDOR_BIN, cmd, param], raise_exc=WHMAPIException) class ModSecSetting(ModSecSettingInterface): config_key = "prev_settings" directives_ids = { "SecAuditEngine": "0", "SecConnEngine": "1", "SecRuleEngine": "2", } OFF = "Off" @classmethod async def apply(cls): """ Reset ModSecurity settings to values chosen by Imunify360. :return str: previous settings """ # save previous settings previous_settings = await cPanelModSecurity.get_settings( *cls.directives_ids ) # set settings from the Imunify360 config module for directive_name, directive_id in cls.directives_ids.items(): await whmapi1( "modsec_set_setting", setting_id=cls.directives_ids[directive_name], state=getattr(ModSecurityDirectives, directive_name), ) await whmapi1("modsec_deploy_settings_changes") return ",".join( "{id}:{state}".format(id=cls.directives_ids[name], state=state) for name, state in sorted(previous_settings.items()) if state.strip() ) @classmethod async def revert(cls, prev_setting_value): """apply the setting previous value""" if prev_setting_value: for item in prev_setting_value.split(","): setting_id, state = item.split(":") if state.strip(): await whmapi1( "modsec_set_setting", setting_id=setting_id, state=state, ) await whmapi1("modsec_deploy_settings_changes") class cPanelModSecurity(ModSecurityInterface): CWAF_INSTALLATION_DIR = "/var/cpanel/cwaf" AUDIT_LOG_FILE = "/usr/local/apache/logs/modsec_audit.log" DISABLED_RULES_CONFIG_DIR = "/etc/apache2/conf.d/" # to load after modsec2.conf GLOBAL_DISABLED_RULES_CONFIG_FILENAME = "modsec2.i360_disabled_rules.conf" PER_DOMAIN_DISABLED_RULES_CONFIG_FILENAME = "i360_modsec_disable.conf" # it should be included before disabled rules config # to ensure that MyImunify rules can be disabled by a customer PER_DOMAIN_MYIMUNIFY_RULES_CONFIG_FILENAME = "i360_001_myimunify.conf" REBUILD_HTTPDCONF_CMD = ("/usr/local/cpanel/scripts/rebuildhttpdconf",) @classmethod def _get_conf_dir(cls) -> str: return cls.DISABLED_RULES_CONFIG_DIR @classmethod @lru_cache(maxsize=1) def _get_ea4_paths(cls) -> Optional[Dict[str, str]]: """Return a dict with ea4 paths or None for non-ea4 case.""" if IS_EA4_PATH.exists(): # easy apache 4 with EA4_PATHS_CONF_PATH.open() as f: paths_config = dict( map(str.strip, line.partition("=")[::2]) for line in f if "=" in line ) return paths_config @classmethod def _get_userdata_dir(cls): ea4_paths_config = cls._get_ea4_paths() if ea4_paths_config is not None: return ea4_paths_config.get( "dir_conf_userdata", "/etc/apache2/conf.d/userdata" ) else: # easy apache 3 return "/usr/local/apache/conf/userdata" @classmethod def _get_domain_confs_paths(cls, user, domain, *, conf_name): # use hardcode version according to # https://docs.cpanel.net/ea4/apache/modify-apache-virtual-hosts-with-include-files/ # now supported version apache >= 2.4 # EA3 is EOL confs = [] version_str = "2_4" for vhost_type in ("ssl", "std"): confs.append( Path(cls._get_userdata_dir()) / vhost_type / version_str / user / domain / conf_name ) return confs @classmethod def _delete_domain_conf(cls, user, domain, *, conf_name) -> bool: deleted = False for conf_path in cls._get_domain_confs_paths( user, domain, conf_name=conf_name ): with suppress(FileNotFoundError): conf_path.unlink(missing_ok=False) deleted = True return deleted @classmethod def _add_domain_conf(cls, user, domain, *, conf_name, conf_text) -> bool: updated = False for conf_path in cls._get_domain_confs_paths( user, domain, conf_name=conf_name ): logger.info("Adding domain configuration: %s", conf_path) conf_path.parent.mkdir(parents=True, exist_ok=True) updated |= atomic_rewrite(str(conf_path), conf_text, backup=False) return updated @classmethod async def sync_disabled_rules_for_domains( cls, domain_rules_map: Dict[str, list] ): logger.info("Sync disabled rules for [%s]", ",".join(domain_rules_map)) for domain, rule_list in domain_rules_map.items(): data = await whmapi1("domainuserdata", domain=domain) user = data["userdata"]["user"] cls._add_domain_conf( user, domain, conf_name=cls.PER_DOMAIN_DISABLED_RULES_CONFIG_FILENAME, conf_text=cls.generate_disabled_rules_config(rule_list), ) await check_run(cls.REBUILD_HTTPDCONF_CMD) @classmethod async def apply_myimunify_modsec_rules_for_domains( cls, *, enabled_users_domains: dict, disabled_users_domains: dict ) -> set: vendor = await cls.get_i360_vendor_name() myimunify_conf = await cls.build_vendor_file_path(vendor, "myimunify") conf_text = f"IncludeOptional {str(myimunify_conf)}\n" conf_name = cls.PER_DOMAIN_MYIMUNIFY_RULES_CONFIG_FILENAME updated_domains = set() def gen_user_domain(users_domains): for user, domains in users_domains.items(): for domain in domains: yield user, domain # add extended ruleset for enabled domains async for user, domain in nice_iterator( gen_user_domain(enabled_users_domains), chunk_size=1000 ): if cls._add_domain_conf( user, domain, conf_name=conf_name, conf_text=conf_text ): updated_domains.add(domain) # delete extended ruleset for disabled domains async for user, domain in nice_iterator( gen_user_domain(disabled_users_domains), chunk_size=1000 ): if cls._delete_domain_conf(user, domain, conf_name=conf_name): updated_domains.add(domain) if updated_domains: await check_run(cls.REBUILD_HTTPDCONF_CMD) return updated_domains @classmethod def write_global_disabled_rules(cls, rule_list): """ :param list rule_list: rules to sync :return: """ os.makedirs(cls.DISABLED_RULES_CONFIG_DIR, exist_ok=True) atomic_rewrite( os.path.join( cls.DISABLED_RULES_CONFIG_DIR, cls.GLOBAL_DISABLED_RULES_CONFIG_FILENAME, ), cls.generate_disabled_rules_config(rule_list), backup=False, ) @classmethod async def sync_global_disabled_rules(cls, rule_list): """ :param list rule_list: rules to sync :raise OSError: if rebuildhttpdconf returned not zero exit code :return: """ cls.write_global_disabled_rules(rule_list) await check_run(cls.REBUILD_HTTPDCONF_CMD) @classmethod def _get_avalible_settings(cls): return [ModSecSetting, cPanelFilesVendorList] @classmethod def get_audit_log_path(cls): ea4_paths_config = cls._get_ea4_paths() if ea4_paths_config is not None: log_dir = ea4_paths_config.get("dir_logs", "/etc/apache2/logs") return os.path.join(log_dir, "modsec_audit.log") else: # easy apache 3 return cls.AUDIT_LOG_FILE @classmethod def get_audit_logdir_path(cls): return "/var/log/apache2/modsec_audit" @classmethod async def installed_modsec(cls): try: rc = (await whmapi1("modsec_is_installed"))["data"]["installed"] except (WHMAPIException, OSError): return False # can't get status, assume not installed else: return rc == 1 # rc==1 means modsec is installed @forbid_dns_only @catch_exception async def _install_settings(self, reload_wafd=True): await self.reset_modsec_directives() await self.reset_modsec_rulesets() async def reset_modsec_directives(self): # implement abstractmethod ModSecurityInterface.reset_modsec_directives await self._reset_modsec_setting(ModSecSetting) async def reset_modsec_rulesets(self): # implement abstractmethod ModSecurityInterface.reset_modsec_rulesets await self._reset_modsec_setting(cPanelFilesVendorList) @forbid_dns_only async def _reset_modsec_setting(self, setting): config = ConfigFile() config.set("MOD_SEC", setting.config_key, await setting.apply()) @forbid_dns_only @skip_if_not_installed_modsec @catch_exception async def revert_settings(self, reload_wafd=True): """Revert install_settings()""" config = ConfigFile() for setting in self._get_avalible_settings(): await setting.revert(config.get("MOD_SEC", setting.config_key)) config.set("MOD_SEC", setting.config_key, None) @classmethod def detect_cwaf(cls): """ Detects Comodo ModSecurity Rule Set :return: bool installed """ return os.path.exists(cls.CWAF_INSTALLATION_DIR) @classmethod @async_lru_cache(maxsize=1) async def installed_modsec_vendors_data(cls) -> List[Dict]: """Returns list of dicts that describes ModSecurity vendors.""" vendors_dict = (await whmapi1("modsec_get_vendors")).get("vendors", []) return vendors_dict @classmethod async def enabled_modsec_vendors_data(cls) -> List[Dict]: """Returns list of dicts that describes enabled ModSecurity vendors.""" vendors_dict = await cls.installed_modsec_vendors_data() return [vendor for vendor in vendors_dict if vendor["enabled"]] @classmethod async def invalidate_installed_vendors_cache(cls): cls.installed_modsec_vendors_data.cache_clear() # NOSONAR Pylint:E1101 @classmethod async def modsec_vendor_list(cls) -> List[str]: """Return a list of installed ModSecurity vendors.""" return [ v["vendor_id"] for v in await cls.installed_modsec_vendors_data() ] @classmethod async def enabled_modsec_vendor_list(cls) -> List[str]: return [ v["vendor_id"] for v in await cls.enabled_modsec_vendors_data() ] @classmethod async def modsec_get_directive(cls, directive_name, default=None): # implement abstractmethod ModSecurityInterface.modsec_get_directive try: return (await cls.get_settings(directive_name))[directive_name] except WHMAPIException: logger.exception("failed to get %s directive", directive_name) return default @classmethod async def get_settings(cls, *directive_names): """Return a mapping ModSecurity directive -> its state.""" settings = (await whmapi1("modsec_get_settings"))["settings"] try: return dict( (item["directive"], item["state"]) for item in settings if item["directive"] in directive_names ) except (KeyError, StopIteration): raise WHMAPIException("Could not parse whmapi1 output") @classmethod async def build_vendor_file_path(cls, vendor: str, filename: str) -> Path: vendors_data = await cls.installed_modsec_vendors_data() vendor_path = next( (v["path"] for v in vendors_data if v["vendor_id"] == vendor), None ) if vendor_path: return Path(vendor_path) / filename raise ModsecVendorsError( "Can't get vendor record for vendor {}." " Installed vendors: {}".format( vendor, [v["vendor_id"] for v in vendors_data] ) ) @classmethod def get_modsec_active_conf_files(cls) -> List[str]: return ModsecDatastore().get_as_list("active_configs") @classmethod def get_modsec_engine_mode(cls) -> str: return ModsecDatastore().get("settings").get("SecRuleEngine") @classmethod def get_modsec_vendor_updates(cls) -> List[str]: return ModsecDatastore().get_as_list("vendor_updates") @classmethod @skip_if_not_installed_modsec async def _apply_modsec_files_update(cls): await cls.invalidate_installed_vendors_cache() await cPanelFilesVendorList.apply() class ModsecDatastore: PATH = "/var/cpanel/modsec_cpanel_conf_datastore" def _read_datastore(self): try: with open(self.PATH) as f: return yaml.safe_load(f) except FileNotFoundError: logger.error("Modsec datastore is not found: %s", self.PATH) return {} except yaml.YAMLError: logger.exception("Modsec datastore is corrupted: %s", self.PATH) return {} def _save_datastore(self, datastore): try: with open(self.PATH, "w") as f: yaml.dump(datastore, f, default_flow_style=False) except Exception: logger.exception("Failed to save modsec datastore: %s", self.PATH) raise def get(self, section): return self._read_datastore().get(section, {}) def get_as_list(self, section): """Get values as a list from the following yaml structure: ``` section: value1: 1 value2: 1 value3: 0 ``` """ as_list = [] for value, enabled in self.get(section).items(): if enabled == 1: as_list.append(value) return as_list def update_vendor_data(self, vendor_id, vendor_configs, enable_updates): datastore = self._read_datastore() active_configs = datastore.get("active_configs") if active_configs: configs_to_remove = [ c for c in active_configs.keys() if c.startswith(f"modsec_vendor_configs/{vendor_id}/") ] for c in configs_to_remove: del active_configs[c] if vendor_configs: if "active_configs" not in datastore: datastore["active_configs"] = {} for c in vendor_configs: datastore["active_configs"][c] = 1 datastore["active_vendors"][vendor_id] = 1 datastore["vendor_updates"][vendor_id] = 1 if enable_updates else 0 else: active_vendors = datastore.get("active_vendors") or {} vendor_updates = datastore.get("vendor_updates") or {} active_vendors.pop(vendor_id, None) vendor_updates.pop(vendor_id, None) self._save_datastore(datastore) class cPanelFilesVendor(FilesVendor): modsec_interface = cPanelModSecurity @alru_cache async def _get_apply_strategy(self): webserver = await self._get_web_server() if webserver in (APACHE, LITESPEED): return cPanelApachePackageApplyStrategy() return cPanelModsecVendorApplyStrategy() async def apply(self): await self.modsec_interface.invalidate_installed_vendors_cache() await self._remove_obsoleted() await self._add_or_update_vendor() apply_strategy = await self._get_apply_strategy() await apply_strategy.rebuild_rules() async def _remove_obsoleted(self): logger_ = logging.getLogger("%s.%s" % (__name__, "_remove_obsoleted")) installed_vendors = set( await self.modsec_interface.modsec_vendor_list() ) if installed_vendors: logger_.info( "Installed_vendors were detected: %r", installed_vendors ) else: logger_.info("No installed_vendors were detected.") return for to_be_removed in installed_vendors & set( self._item.get("obsoletes", []) ): logger_.info("Removing obsoleted vendor %r", to_be_removed) await self._remove_vendor_by_id(to_be_removed) installed_vendors.discard(to_be_removed) # Here we are removing vendors that are no more appropriate for # the current setup (obsoleted ones (DEF-4434)) or those are # not appropriate for active webserver (apache, litespeed)) for to_be_removed in installed_vendors: if ( to_be_removed.startswith("imunify360") and to_be_removed != self.vendor_id ): logger_.info( "Removing vendor %r which is inappropriate for this setup", to_be_removed, ) await self._remove_vendor_by_id(to_be_removed) async def _add_or_update_vendor(self): apply_strategy = await self._get_apply_strategy() installed_vendors = await self.modsec_interface.modsec_vendor_list() if self.vendor_id in installed_vendors: enabled_vendors = ( await self.modsec_interface.enabled_modsec_vendor_list() ) if self.vendor_id in enabled_vendors: try: await apply_strategy.update_vendor( self.vendor_id, self._item ) except CheckRunError as e: logger.error( "%r failed with error %r", MODSEC_VENDOR_BIN, e ) else: logger.info( "Successfully updated vendor %r.", self.vendor_id ) else: await apply_strategy.add_vendor(self.vendor_id, self._item) logger.info("Successfully installed vendor %r.", self.vendor_id) async def _remove_vendor_by_id(self, vendor_id): apply_strategy = await self._get_apply_strategy() await apply_strategy.remove_vendor(vendor_id, None) async def _remove_vendor(self): await self._remove_vendor_by_id(self.vendor_id) def _vendor_id(self): basename = os.path.basename(urlparse(self._item["url"]).path) basename_no_yaml, _ = os.path.splitext(basename) if basename_no_yaml.startswith("meta_"): return basename_no_yaml[len("meta_") :] else: return None class cPanelFilesVendorList(FilesVendorList): files_vendor = cPanelFilesVendor modsec_interface = cPanelModSecurity _FULLY_COMPATIBLE_VENDORS = {"configserver"} @classmethod async def _get_compatible_name(cls, installed_vendors): web_server = await cls._get_web_server() if not web_server: raise cls.CompatiblityCheckFailed( "Web-server is not running, skipping " "imunify360 vendor installation", installed_vendors, ) return MODSEC_NAME_TEMPLATE.format( ruleset_suffix=cls.get_ruleset_suffix(), webserver=web_server, panel="cpanel", ) @classmethod def vendor_fit_panel(cls, item): return item["name"].endswith("cpanel") class cPanelFilesVendorApplyStrategy(ABC): @abstractclassmethod async def add_vendor(cls, vendor_id: str, item: dict) -> None: pass @abstractclassmethod async def update_vendor(cls, vendor_id: str, item: dict) -> None: pass @abstractclassmethod async def remove_vendor(cls, vendor_id: str, item: dict | None) -> None: pass @abstractclassmethod async def rebuild_rules() -> None: pass class cPanelModsecVendorApplyStrategy(cPanelFilesVendorApplyStrategy): @classmethod async def add_vendor(cls, vendor_id: str, item: dict) -> None: await _modsec_vendor_cmd("add", item["url"]) @classmethod async def update_vendor(cls, vendor_id: str, item: dict) -> None: await check_run([MODSEC_VENDOR_BIN, "update", "--auto"]) @classmethod async def remove_vendor(cls, vendor_id: str, item: dict | None) -> None: await _modsec_vendor_cmd("remove", vendor_id) @classmethod async def rebuild_rules(cls) -> None: # not needed when `modsec_vendor` is used pass class cPanelApachePackageApplyStrategy(cPanelFilesVendorApplyStrategy): APACHE_MODSEC_VENDOR_CONF_DIR = Path( "/etc/apache2/conf.d/modsec_vendor_configs" ) @classmethod async def add_vendor(cls, vendor_id: str, item: dict) -> None: """ Add vendor via package: - create meta file /var/cpanel/modsec_vendors/meta_<vendor_id>.yaml - add record about the vendor to /var/cpanel/modsec_vendors/installed_from.yaml - copy rules bundle to /etc/apache2/conf.d/modsec_vendor_configs - enable vendor - update rules and vendor config in /var/cpanel/modsec_cpanel_conf_datastore """ cls._make_pkg_yaml(vendor_id) await cls._update_installed_from(vendor_id) bundle_src = Path(item["local_path"]).parent / f"{vendor_id}.zip" bundle_dst = cls.APACHE_MODSEC_VENDOR_CONF_DIR / vendor_id shutil.rmtree(bundle_dst, ignore_errors=True) with zipfile.ZipFile(bundle_src, "r") as z: z.extractall(cls.APACHE_MODSEC_VENDOR_CONF_DIR) await _modsec_vendor_cmd("enable", vendor_id) await _modsec_vendor_cmd("enable-configs", vendor_id) await _modsec_vendor_cmd("enable-updates", vendor_id) await cls.update_conf_datastore(vendor_id) @classmethod async def update_vendor(cls, vendor_id: str, item: dict) -> None: """ Update vendor via package, with converting its config to package if needed. If the package needs to be converted: - create meta file /var/cpanel/modsec_vendors/meta_<vendor_id>.yaml - add record about the vendor to /var/cpanel/modsec_vendors/installed_from.yaml Then: - copy rules bundle to /etc/apache2/conf.d/modsec_vendor_configs - update rules and vendor config in /var/cpanel/modsec_cpanel_conf_datastore """ if not cls._is_pkg(vendor_id): cls._make_pkg_yaml(vendor_id) await cls._update_installed_from(vendor_id) updates_enabled = True else: updates_enabled = await cls._check_if_updates_enabled(vendor_id) bundle_src = Path(item["local_path"]).parent / f"{vendor_id}.zip" bundle_dst = cls.APACHE_MODSEC_VENDOR_CONF_DIR / vendor_id shutil.rmtree(bundle_dst, ignore_errors=True) with zipfile.ZipFile(bundle_src, "r") as z: z.extractall(cls.APACHE_MODSEC_VENDOR_CONF_DIR) if not updates_enabled: await _modsec_vendor_cmd("disable-updates", vendor_id) await cls.update_conf_datastore( vendor_id, enable_updates=updates_enabled ) @classmethod async def remove_vendor(cls, vendor_id: str, item: dict) -> None: """ Remove vendor. If the package was added via modsec_vendor, use `modsec_vendor remove`. Otherwise: - remove meta file /var/cpanel/modsec_vendors/meta_<vendor_id>.yaml - remove record /var/cpanel/modsec_vendors/installed_from.yaml - remove rules bundle /etc/apache2/conf.d/modsec_vendor_configs/<vendor_id> - update rules and vendor config in /var/cpanel/modsec_cpanel_conf_datastore - clear caches """ if not cls._is_pkg(vendor_id): await _modsec_vendor_cmd("remove", vendor_id) return await cls._remove_installed_from(vendor_id) pkg_yaml_path = cls._make_pkg_yaml_path(vendor_id) pkg_yaml_path.unlink() shutil.rmtree( cls.APACHE_MODSEC_VENDOR_CONF_DIR / vendor_id, ignore_errors=True, ) await cls.update_conf_datastore(vendor_id) await cls._clear_cache(vendor_id) Path("/var/cpanel/modsec_vendors/installed_from.cache").unlink( missing_ok=True ) Path("/var/cpanel/modsec_cpanel_conf_datastore.cache").unlink( missing_ok=True ) await cls.rebuild_rules() @classmethod async def rebuild_rules(cls) -> None: await check_run( ( "/usr/local/cpanel/3rdparty/bin/perl", "-MWhostmgr::ModSecurity::ModsecCpanelConf", "-e", ( "Whostmgr::ModSecurity::ModsecCpanelConf->new->manipulate(sub {})" ), ) ) @classmethod def _make_pkg_yaml_path(cls, vendor_id): return MODSEC_VENDOR_VAR_DIR / f"meta_{vendor_id}.yaml" @classmethod def _make_pkg_yaml(cls, vendor_id): with open(cls._make_pkg_yaml_path(vendor_id), "w") as f: content = textwrap.dedent( r""" --- attributes: description: Imunify360 ModSecurity Rules For Apache name: Imunify360 Apache Rule Set vendor_url: https://docs.imunify360.com/ is_pkg: imunify360-firewall-cpanel is_rpm: imunify360-firewall-cpanel """ ) f.write(content) @classmethod def _is_pkg(cls, vendor_id): try: with open(cls._make_pkg_yaml_path(vendor_id), "r") as f: pkg_data = yaml.safe_load(f) except FileNotFoundError: logger.warning("Package meta not found") return except yaml.YAMLError: logger.warning("Package meta corrupted") return pkg_name = pkg_data.get("attributes", {}).get("is_pkg") is_pkg = pkg_name == "imunify360-firewall-cpanel" return is_pkg @classmethod async def _clear_cache(cls, vendor_id: str) -> None: (MODSEC_VENDOR_VAR_DIR / f"meta_{vendor_id}.cache").unlink( missing_ok=True ) @classmethod async def _update_installed_from(cls, vendor_id: str) -> None: await check_run( ( "/usr/local/cpanel/3rdparty/bin/perl", "-MCpanel::CachedDataStore", "-e", ( f'my $hr=Cpanel::CachedDataStore::loaddatastore($ARGV[0]);$hr->{{data}}{{"{vendor_id}"}}' ' = { distribution => "imunify360-firewall-cpanel", url' ' => "N/A"};Cpanel::CachedDataStore::savedatastore($ARGV[0],' " { data => $hr->{data} })" ), str(MODSEC_VENDOR_VAR_DIR / "installed_from.yaml"), ) ) @classmethod async def _remove_installed_from(cls, vendor_id: str) -> None: await check_run( ( "/usr/local/cpanel/3rdparty/bin/perl", "-MCpanel::CachedDataStore", "-e", ( "my $hr=Cpanel::CachedDataStore::loaddatastore($ARGV[0]);delete" f' $hr->{{data}}{{"{vendor_id}"}};Cpanel::CachedDataStore::savedatastore($ARGV[0],' " { data => $hr->{data} })" ), str(MODSEC_VENDOR_VAR_DIR / "installed_from.yaml"), ) ) @classmethod async def _check_if_updates_enabled(cls, vendor_id): enabled = ModsecDatastore().get("vendor_updates").get(vendor_id, False) return enabled @classmethod async def update_conf_datastore(cls, vendor_id, enable_updates=True): vendor_dir = cls.APACHE_MODSEC_VENDOR_CONF_DIR / vendor_id conf_files = [] if vendor_dir.exists(): for file_path in vendor_dir.iterdir(): if file_path.is_file() and file_path.suffix == ".conf": relative_path = ( f"modsec_vendor_configs/{vendor_id}/{file_path.name}" ) conf_files.append(relative_path) ModsecDatastore().update_vendor_data( vendor_id, conf_files, enable_updates )