ÿØÿàJFIFÿþ ÿÛC       ÿÛC ÿÀÿÄÿÄ"#QrÿÄÿÄ&1!A"2qQaáÿÚ ?Øy,æ/3JæÝ¹È߲؋5êXw²±ÉyˆR”¾I0ó2—PI¾IÌÚiMö¯–þrìN&"KgX:Šíµ•nTJnLK„…@!‰-ý ùúmë;ºgµŒ&ó±hw’¯Õ@”Ü— 9ñ-ë.²1<yà‚¹ïQÐU„ہ?.’¦èûbß±©Ö«Âw*VŒ) `$‰bØÔŸ’ëXÖ-ËTÜíGÚ3ð«g Ÿ§¯—Jx„–’U/ÂÅv_s(Hÿ@TñJÑãõçn­‚!ÈgfbÓc­:él[ðQe 9ÀPLbÃãCµm[5¿ç'ªjglå‡Ûí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢SŸx?"¸¦ùY騐ÒOÈ q’`~~ÚtËU¹CڒêV  I1Áß_ÿÙ""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import logging import os import shlex import subprocess from collections import defaultdict from datetime import datetime, timedelta from functools import lru_cache from typing import Optional from defence360agent.contracts.config import ( choose_value_from_config, MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.subsys.panels.plesk import Plesk from defence360agent.utils import async_lru_cache, atomic_rewrite from imav.malwarelib.model import MalwareHit from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync as ScanQueue, ) from imav.malwarelib.utils import user_list from imav.model.wordpress import WPSite from imav.wordpress import ( WP_CLI_WRAPPER_PATH, ) from imav.wordpress.exception import PHPError CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user" CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl" logger = logging.getLogger(__name__) @async_lru_cache(ttl=60) async def get_domain_paths() -> dict[str, list[str]]: """ Get a mapping of docroots to their associated domains, with caching. """ hosting_panel = HostingPanel() panel_paths = await hosting_panel.get_domain_paths() docroot_map = defaultdict(list) for domain, docroots in panel_paths.items(): for docroot in docroots: docroot_map[docroot].append(domain) return docroot_map def wp_wrapper(php_path: str, docroot: str) -> list: """Get wp cli common command list""" return [str(WP_CLI_WRAPPER_PATH), php_path, docroot] @lru_cache(maxsize=1) def get_cagefs_enabled_users() -> set: """Get the list of users enabled for CageFS.""" if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access( CAGEFS_CTL_PATH, os.X_OK ): return set() result = subprocess.run( [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True ) if result.returncode != 0: return set() lines = result.stdout.strip().split("\n") return set(lines[1:]) # Skip the first line which is a summary def clear_get_cagefs_enabled_users_cache(): """Clear the cache for get_cagefs_enabled_users.""" get_cagefs_enabled_users.cache_clear() def build_command_for_user(username: str, args: list) -> list: """Build the necessary command to run the given cmdline args with specified user.""" if username in get_cagefs_enabled_users(): if os.path.isfile(CAGEFS_ENTER_PATH) and os.access( CAGEFS_ENTER_PATH, os.X_OK ): return [ CAGEFS_ENTER_PATH, "--no-io-and-memory-limit", username, *args, ] return [ "su", "-s", "/bin/bash", username, "-c", shlex.join(args), ] async def get_domains_for_docroot( docroot: str, domain_to_exclude: str ) -> list[str]: """ Get all domains associated with a given document root, excluding one domain. It's panel-agnostic and uses a cached mapping. """ docroot_map = await get_domain_paths() all_domains = docroot_map.get(docroot, []) return [domain for domain in all_domains if domain != domain_to_exclude] async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]: """Determine PHP binary path for the given WPSite.""" from clcommon.cpapi import ( get_domains_php_info, get_installed_php_versions, ) domains_php_info = get_domains_php_info() installed_php_versions = get_installed_php_versions() def find_php_binary_for_domain(domain: str) -> Optional[str]: domain_info = domains_php_info.get(domain) if not domain_info or domain_info.get("username") != username: return None php_display_version = domain_info.get("display_version") if not php_display_version: return None for php_version in installed_php_versions: if php_version.get("identifier") == php_display_version: return php_version.get("bin") return None # First, try with the main domain of the site. php_binary_path = find_php_binary_for_domain(site.domain) if php_binary_path: return php_binary_path # If not found, try with other domains for the site's docroot. domains = await get_domains_for_docroot( site.docroot, domain_to_exclude=site.domain ) for domain in domains: php_binary_path = find_php_binary_for_domain(domain) if php_binary_path: return php_binary_path raise PHPError( f"PHP binary was not identified for docroot: {site.docroot}, username:" f" {username}" ) def get_malware_history(username: str) -> list: """ Get malware history for the specified user. This is an equivalent of calling `imunify360-agent malware history list --user {username}`. `` """ (max_count, hits) = MalwareHit.malicious_list(user=username) return hits async def get_last_scan(sink, username: str) -> dict: """ Get the last scan for the specified user. This is an equivalent of calling `imunify360-agent malware user list --user {username}`. """ queue = ScanQueue(sink) _, users = await user_list.fetch_user_list( queue.get_scans_from_paths, match={username} ) if not users: return {} users = user_list.sort(users, "scan_date", desc=True) return users[0] def calculate_next_scan_timestamp(interval, hour, day_of_month, day_of_week): """ Calculate the next scan timestamp based on schedule configuration. Args: interval: Scan interval (DAY, WEEK, MONTH, or NONE) hour: Hour of day to run scan (0-23) day_of_month: Day of month to run scan (1-31) day_of_week: Day of week to run scan (0-6, where 0=Sunday) Returns: Timestamp of next scan, or None if interval is NONE """ today = datetime.utcnow() if interval == Interval.DAY: next_scan = today.replace( hour=hour, minute=0, second=0, microsecond=0, ) if today >= next_scan: next_scan += timedelta(days=1) return next_scan.timestamp() if interval == Interval.WEEK: # today.weekday() returns 0 for Monday, 6 for Sunday, but day_of_week uses 0 for Sunday, # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation. days_ahead = (day_of_week - (today.weekday() + 1) % 7 + 7) % 7 if days_ahead == 0 and today.hour >= hour: days_ahead = 7 next_scan_date = today + timedelta(days=days_ahead) return next_scan_date.replace( hour=hour, minute=0, second=0, microsecond=0 ).timestamp() if interval == Interval.MONTH: from calendar import monthrange def find_next_suitable_month(year, month, days): """Find the next month that has at least given number of days.""" current_year, current_month = year, month # Always start with the next month when advancing current_month += 1 if current_month > 12: current_month = 1 current_year += 1 # Keep advancing months until we find one with enough days while True: days_in_month = monthrange(current_year, current_month)[1] if days <= days_in_month: return current_year, current_month current_month += 1 if current_month > 12: current_month = 1 current_year += 1 # Check if we need to advance to next month should_advance_month = ( # Today is after the scheduled day, scan already ran this month today.day > day_of_month # Today is the scheduled day and the hour is after the scheduled hour, scan already ran earlier today or (today.day == day_of_month and today.hour >= hour) # Current month doesn't have enough days, scan should run next suitable month or day_of_month > monthrange(today.year, today.month)[1] ) if should_advance_month: # Find the next month that can accommodate the configured day next_year, next_month = find_next_suitable_month( today.year, today.month, day_of_month ) next_scan_date = today.replace( day=day_of_month, # Use the actual configured day month=next_month, year=next_year, hour=hour, minute=0, second=0, microsecond=0, ) else: # Current month can accommodate the configured day next_scan_date = today.replace( day=day_of_month, hour=hour, minute=0, second=0, microsecond=0, ) return next_scan_date.timestamp() def prepare_scan_data( last_scan_time: float, next_scan_time: float, username: str, site: WPSite, malware_by_site: dict, ) -> dict: """ Prepare scan data JSON for a WordPress site. Args: last_scan_time: Timestamp of the last scan next_scan_time: Timestamp of the next scheduled scan username: Username of the site owner site: WordPress site object malware_by_site: Dictionary mapping site docroots to their malware hits Returns: dict: JSON data ready to be written to scan_data.php. The response includes: - lastScanTimestamp: Timestamp of the last scan - nextScanTimestamp: Timestamp of the next scheduled scan - username: Username of the site owner - malware: List of malware hits for the site - config: Configuration items for the site - license: License information including status and eligibility for Imunify patch """ # Define the config sections and options needed config_sections = [ ("MALWARE_SCANNING", "enable_scan_cpanel"), ("MALWARE_SCANNING", "default_action"), ("PROACTIVE_DEFENCE", "blamer"), ] # Build the config items config_items = {} for section, option in config_sections: if section not in config_items: config_items[section] = {} try: value, _ = choose_value_from_config( section, option, username=username, ) except KeyError: value = None config_items[section][option] = value return { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "username": username, "malware": malware_by_site.get(site.docroot, []), "config": config_items, "license": LicenseCLN.license_info(), } def write_plugin_data_file_atomically( file_path, content: str, uid: int, gid: int ) -> None: """ Helper function to write a plugin data file atomically with optional touch. Args: file_path: Path to the file to write content: Content to write to the file uid: User ID for file ownership gid: Group ID for file ownership """ if not file_path.exists(): file_path.touch() # Set permissions based on hosting panel permissions = 0o440 if HostingPanel().NAME == Plesk.NAME else 0o400 atomic_rewrite( file_path, content, backup=False, uid=uid, gid=gid, permissions=permissions, )