ÿØÿà JFIF ÿþ
ÿÛ C
ÿÛ C ÿÀ ÿÄ ÿÄ " #QrÿÄ ÿÄ & 1! A"2qQaáÿÚ ? Øy,æ/3JæÝ¹Èß²Ø5êXw²±ÉyR¾I0ó2PI¾IÌÚiMö¯þrìN&"KgX:íµnTJnLK
@!-ýùúmë;ºgµ&ó±hw¯Õ@Ü9ñ-ë.²1<yà¹ïQÐUÛ?.¦èûbß±©Ö«Âw*V) `$bØÔëXÖ-ËTÜíGÚ3ð«g §¯JxU/ÂÅv_s(Hÿ @TñJÑãõçn!ÈgfbÓc:él[ðQe9ÀPLbÃãCµm[5¿ç'ªjglåÛí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢S x?"¸¦ùYé¨ÒOÈ q`~~ÚtËU¹CÚêV I1Áß_ÿÙ"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import logging
import os
import shlex
import subprocess
from collections import defaultdict
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Optional
from defence360agent.contracts.config import (
choose_value_from_config,
MalwareScanScheduleInterval as Interval,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.subsys.panels.plesk import Plesk
from defence360agent.utils import async_lru_cache, atomic_rewrite
from imav.malwarelib.model import MalwareHit
from imav.malwarelib.scan.queue_supervisor_sync import (
QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.utils import user_list
from imav.model.wordpress import WPSite
from imav.wordpress import (
WP_CLI_WRAPPER_PATH,
)
from imav.wordpress.exception import PHPError
CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user"
CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"
logger = logging.getLogger(__name__)
@async_lru_cache(ttl=60)
async def get_domain_paths() -> dict[str, list[str]]:
"""
Get a mapping of docroots to their associated domains, with caching.
"""
hosting_panel = HostingPanel()
panel_paths = await hosting_panel.get_domain_paths()
docroot_map = defaultdict(list)
for domain, docroots in panel_paths.items():
for docroot in docroots:
docroot_map[docroot].append(domain)
return docroot_map
def wp_wrapper(php_path: str, docroot: str) -> list:
"""Get wp cli common command list"""
return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]
@lru_cache(maxsize=1)
def get_cagefs_enabled_users() -> set:
"""Get the list of users enabled for CageFS."""
if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access(
CAGEFS_CTL_PATH, os.X_OK
):
return set()
result = subprocess.run(
[CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True
)
if result.returncode != 0:
return set()
lines = result.stdout.strip().split("\n")
return set(lines[1:]) # Skip the first line which is a summary
def clear_get_cagefs_enabled_users_cache():
"""Clear the cache for get_cagefs_enabled_users."""
get_cagefs_enabled_users.cache_clear()
def build_command_for_user(username: str, args: list) -> list:
"""Build the necessary command to run the given cmdline args with specified user."""
if username in get_cagefs_enabled_users():
if os.path.isfile(CAGEFS_ENTER_PATH) and os.access(
CAGEFS_ENTER_PATH, os.X_OK
):
return [
CAGEFS_ENTER_PATH,
"--no-io-and-memory-limit",
username,
*args,
]
return [
"su",
"-s",
"/bin/bash",
username,
"-c",
shlex.join(args),
]
async def get_domains_for_docroot(
docroot: str, domain_to_exclude: str
) -> list[str]:
"""
Get all domains associated with a given document root, excluding one domain.
It's panel-agnostic and uses a cached mapping.
"""
docroot_map = await get_domain_paths()
all_domains = docroot_map.get(docroot, [])
return [domain for domain in all_domains if domain != domain_to_exclude]
async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]:
"""Determine PHP binary path for the given WPSite."""
from clcommon.cpapi import (
get_domains_php_info,
get_installed_php_versions,
)
domains_php_info = get_domains_php_info()
installed_php_versions = get_installed_php_versions()
def find_php_binary_for_domain(domain: str) -> Optional[str]:
domain_info = domains_php_info.get(domain)
if not domain_info or domain_info.get("username") != username:
return None
php_display_version = domain_info.get("display_version")
if not php_display_version:
return None
for php_version in installed_php_versions:
if php_version.get("identifier") == php_display_version:
return php_version.get("bin")
return None
# First, try with the main domain of the site.
php_binary_path = find_php_binary_for_domain(site.domain)
if php_binary_path:
return php_binary_path
# If not found, try with other domains for the site's docroot.
domains = await get_domains_for_docroot(
site.docroot, domain_to_exclude=site.domain
)
for domain in domains:
php_binary_path = find_php_binary_for_domain(domain)
if php_binary_path:
return php_binary_path
raise PHPError(
f"PHP binary was not identified for docroot: {site.docroot}, username:"
f" {username}"
)
def get_malware_history(username: str) -> list:
"""
Get malware history for the specified user.
This is an equivalent of calling `imunify360-agent malware history list --user {username}`.
``
"""
(max_count, hits) = MalwareHit.malicious_list(user=username)
return hits
async def get_last_scan(sink, username: str) -> dict:
"""
Get the last scan for the specified user.
This is an equivalent of calling `imunify360-agent malware user list --user {username}`.
"""
queue = ScanQueue(sink)
_, users = await user_list.fetch_user_list(
queue.get_scans_from_paths, match={username}
)
if not users:
return {}
users = user_list.sort(users, "scan_date", desc=True)
return users[0]
def calculate_next_scan_timestamp(interval, hour, day_of_month, day_of_week):
"""
Calculate the next scan timestamp based on schedule configuration.
Args:
interval: Scan interval (DAY, WEEK, MONTH, or NONE)
hour: Hour of day to run scan (0-23)
day_of_month: Day of month to run scan (1-31)
day_of_week: Day of week to run scan (0-6, where 0=Sunday)
Returns:
Timestamp of next scan, or None if interval is NONE
"""
today = datetime.utcnow()
if interval == Interval.DAY:
next_scan = today.replace(
hour=hour,
minute=0,
second=0,
microsecond=0,
)
if today >= next_scan:
next_scan += timedelta(days=1)
return next_scan.timestamp()
if interval == Interval.WEEK:
# today.weekday() returns 0 for Monday, 6 for Sunday, but day_of_week uses 0 for Sunday,
# 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation.
days_ahead = (day_of_week - (today.weekday() + 1) % 7 + 7) % 7
if days_ahead == 0 and today.hour >= hour:
days_ahead = 7
next_scan_date = today + timedelta(days=days_ahead)
return next_scan_date.replace(
hour=hour, minute=0, second=0, microsecond=0
).timestamp()
if interval == Interval.MONTH:
from calendar import monthrange
def find_next_suitable_month(year, month, days):
"""Find the next month that has at least given number of days."""
current_year, current_month = year, month
# Always start with the next month when advancing
current_month += 1
if current_month > 12:
current_month = 1
current_year += 1
# Keep advancing months until we find one with enough days
while True:
days_in_month = monthrange(current_year, current_month)[1]
if days <= days_in_month:
return current_year, current_month
current_month += 1
if current_month > 12:
current_month = 1
current_year += 1
# Check if we need to advance to next month
should_advance_month = (
# Today is after the scheduled day, scan already ran this month
today.day > day_of_month
# Today is the scheduled day and the hour is after the scheduled hour, scan already ran earlier today
or (today.day == day_of_month and today.hour >= hour)
# Current month doesn't have enough days, scan should run next suitable month
or day_of_month > monthrange(today.year, today.month)[1]
)
if should_advance_month:
# Find the next month that can accommodate the configured day
next_year, next_month = find_next_suitable_month(
today.year, today.month, day_of_month
)
next_scan_date = today.replace(
day=day_of_month, # Use the actual configured day
month=next_month,
year=next_year,
hour=hour,
minute=0,
second=0,
microsecond=0,
)
else:
# Current month can accommodate the configured day
next_scan_date = today.replace(
day=day_of_month,
hour=hour,
minute=0,
second=0,
microsecond=0,
)
return next_scan_date.timestamp()
def prepare_scan_data(
last_scan_time: float,
next_scan_time: float,
username: str,
site: WPSite,
malware_by_site: dict,
) -> dict:
"""
Prepare scan data JSON for a WordPress site.
Args:
last_scan_time: Timestamp of the last scan
next_scan_time: Timestamp of the next scheduled scan
username: Username of the site owner
site: WordPress site object
malware_by_site: Dictionary mapping site docroots to their malware hits
Returns:
dict: JSON data ready to be written to scan_data.php. The response includes:
- lastScanTimestamp: Timestamp of the last scan
- nextScanTimestamp: Timestamp of the next scheduled scan
- username: Username of the site owner
- malware: List of malware hits for the site
- config: Configuration items for the site
- license: License information including status and eligibility for Imunify patch
"""
# Define the config sections and options needed
config_sections = [
("MALWARE_SCANNING", "enable_scan_cpanel"),
("MALWARE_SCANNING", "default_action"),
("PROACTIVE_DEFENCE", "blamer"),
]
# Build the config items
config_items = {}
for section, option in config_sections:
if section not in config_items:
config_items[section] = {}
try:
value, _ = choose_value_from_config(
section,
option,
username=username,
)
except KeyError:
value = None
config_items[section][option] = value
return {
"lastScanTimestamp": last_scan_time,
"nextScanTimestamp": next_scan_time,
"username": username,
"malware": malware_by_site.get(site.docroot, []),
"config": config_items,
"license": LicenseCLN.license_info(),
}
def write_plugin_data_file_atomically(
file_path, content: str, uid: int, gid: int
) -> None:
"""
Helper function to write a plugin data file atomically with optional touch.
Args:
file_path: Path to the file to write
content: Content to write to the file
uid: User ID for file ownership
gid: Group ID for file ownership
"""
if not file_path.exists():
file_path.touch()
# Set permissions based on hosting panel
permissions = 0o440 if HostingPanel().NAME == Plesk.NAME else 0o400
atomic_rewrite(
file_path,
content,
backup=False,
uid=uid,
gid=gid,
permissions=permissions,
)