Edit File: permissions.py
import logging from asyncio.coroutines import iscoroutinefunction from pathlib import Path from typing import Optional from defence360agent.contracts.config import MyImunifyConfig, PermissionsConfig from defence360agent.contracts.license import LicenseCLN from defence360agent.feature_management.constants import AV_REPORT, FULL from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.myimunify.model import MyImunify from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.subsys.panels.plesk import Plesk from defence360agent.utils import importer from imav.malwarelib.api.imunify_patch_subscription import ( ImunifyPatchSubscriptionAPI, ) logger = logging.getLogger(__name__) PERMISSIONS = ( MS_VIEW, MS_CLEAN, MS_CLEAN_REQUIRES_MYIMUNIFY_PROTECTION, MS_ON_DEMAND_SCAN, MS_ON_DEMAND_SCAN_WITHOUT_RATE_LIMIT, MS_IGNORE_LIST_EDIT, MS_CONFIG_DEFAULT_ACTION_EDIT, MS_IMUNIFY_PATCH_ENABLED, MS_IMUNIFY_PATCH_ELIGIBLE_TO_PURCHASE, PD_VIEW, PD_CONFIG_MODE_EDIT, ) = ( "malware_scanner.view", "malware_scanner.clean", "malware_scanner.clean_requires_myimunify_protection", "malware_scanner.on_demand.scan", "malware_scanner.on_demand.scan_without_rate_limit", "malware_scanner.ignore_list.edit", "malware_scanner.config.default_action.edit", "malware_scanner.imunify_patch.enabled", "malware_scanner.imunify_patch.eligible_to_purchase", "proactive_defense.view", "proactive_defense.config.mode.edit", ) GLOBAL_CONFDIR = Path("/etc/sysconfig/imunify360") def is_plesk_service_plan_enabled() -> bool: return ( HostingPanel().NAME == Plesk.NAME and PermissionsConfig.USE_PLESK_SERVICE_PLAN ) def myimunify_protection_enabled(user: Optional[str] = None) -> bool: return MyImunify.get_protection(user) def ms_view(user: Optional[str] = None) -> bool: if user is None: return True return FeatureManagementPerms.get_perm(user).av in ( AV_REPORT, FULL, ) def ms_clean(user: Optional[str] = None) -> bool: if LicenseCLN.is_free() or not LicenseCLN.is_valid(): return False if user is None: return True if is_plesk_service_plan_enabled(): # should be handled by Plesk extension return True return FeatureManagementPerms.get_perm(user).av == FULL def ms_clean_requires_myimunify_protection(user: Optional[str] = None): if MyImunifyConfig.ENABLED: return myimunify_protection_enabled(user) return ms_clean(user) def ms_on_demand_scan(user: Optional[str] = None) -> bool: if user is None: return True if MyImunifyConfig.ENABLED: # on-demand scan is available for both Basic and Pro subscriptions return True if is_plesk_service_plan_enabled(): # should be handled by Plesk extension return True return PermissionsConfig.ALLOW_MALWARE_SCAN def ms_on_demand_scan_without_rate_limit( user: Optional[str] = None, ) -> bool: if MyImunifyConfig.ENABLED: return myimunify_protection_enabled(user) return PermissionsConfig.ALLOW_MALWARE_SCAN def ms_ignore_list_edit(user: Optional[str] = None): if user is None: return True if MyImunifyConfig.ENABLED: # so far, MyImunify doesn't allow to the user editing ignore list return False return PermissionsConfig.USER_IGNORE_LIST def ms_config_default_action_edit(user: Optional[str] = None): if user is None: return True if MyImunifyConfig.ENABLED: # so far, MyImunify doesn't allow to the user # editing default malware action return False return PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS ms_imunify_patch_enabled = importer.get( module="imav.contracts.permissions", name="is_imunify_patch_enabled", default=lambda _: False, ) has_imunify_patch_subscriptions = importer.get( module="imav.malwarelib.api.imunify_patch_subscription", name="has_imunify_patch_subscriptions", default=lambda _: False, ) async def ms_imunify_patch_eligible_to_purchase( user: str | None = None, ) -> bool: return ( LicenseCLN.is_eligible_for_imunify_patch() or has_imunify_patch_subscriptions(user) or ( await ImunifyPatchSubscriptionAPI.get_purchase_eligibility() ).eligible ) def pd_view(user: Optional[str] = None): if user is None: return True return FeatureManagementPerms.get_perm(user).proactive == FULL def pd_config_mode_edit(user: Optional[str] = None): if user is None: return True if MyImunifyConfig.ENABLED: return False return PermissionsConfig.USER_OVERRIDE_PROACTIVE_DEFENSE HAS_PERMISSION = { MS_VIEW: ms_view, MS_CLEAN: ms_clean, MS_CLEAN_REQUIRES_MYIMUNIFY_PROTECTION: ( ms_clean_requires_myimunify_protection ), MS_ON_DEMAND_SCAN: ms_on_demand_scan, MS_ON_DEMAND_SCAN_WITHOUT_RATE_LIMIT: ms_on_demand_scan_without_rate_limit, MS_IGNORE_LIST_EDIT: ms_ignore_list_edit, MS_CONFIG_DEFAULT_ACTION_EDIT: ms_config_default_action_edit, MS_IMUNIFY_PATCH_ENABLED: ms_imunify_patch_enabled, MS_IMUNIFY_PATCH_ELIGIBLE_TO_PURCHASE: ms_imunify_patch_eligible_to_purchase, PD_VIEW: pd_view, PD_CONFIG_MODE_EDIT: pd_config_mode_edit, } async def has_permission(permission, user) -> bool: func = HAS_PERMISSION[permission] if iscoroutinefunction(func): return await HAS_PERMISSION[permission](user) return func(user) async def check_permission(permission, user) -> None: func = HAS_PERMISSION[permission] if iscoroutinefunction(func): if not await func(user): raise PermissionError("notifications.generalPermissionError") else: if not func(user): raise PermissionError("notifications.generalPermissionError") async def permissions_list(user) -> list[str]: return [ permission for permission in PERMISSIONS if await has_permission(permission, user) ]