ÿØÿà JFIF ÿþ
ÿÛ C
ÿÛ C ÿÀ ÿÄ ÿÄ " #QrÿÄ ÿÄ & 1! A"2qQaáÿÚ ? Øy,æ/3JæÝ¹Èß²Ø5êXw²±ÉyR¾I0ó2PI¾IÌÚiMö¯þrìN&"KgX:íµnTJnLK
@!-ýùúmë;ºgµ&ó±hw¯Õ@Ü9ñ-ë.²1<yà¹ïQÐUÛ?.¦èûbß±©Ö«Âw*V) `$bØÔëXÖ-ËTÜíGÚ3ð«g §¯JxU/ÂÅv_s(Hÿ @TñJÑãõçn!ÈgfbÓc:él[ðQe9ÀPLbÃãCµm[5¿ç'ªjglåÛí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢S x?"¸¦ùYé¨ÒOÈ q`~~ÚtËU¹CÚêV I1Áß_ÿÙ"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import logging
import os
import pwd
import secrets
from datetime import datetime, timedelta
from functools import lru_cache
from pathlib import Path
import jwt
from defence360agent.utils import atomic_rewrite, check_run
from imav.wordpress.cli import get_data_dir
from imav.wordpress.utils import write_plugin_data_file_atomically
logger = logging.getLogger(__name__)
DEFAULT_TOKEN_EXPIRATION = timedelta(hours=72)
JWT_SECRET_PATH = "/etc/imunify-agent-proxy/jwt-secret"
JWT_SECRET_PATH_OLD = "/etc/imunify-agent-proxy/jwt-secret.old"
PROXY_SERVICE_NAME = "imunify-agent-proxy"
SECRET_EXPIRATION_TTL = timedelta(days=7)
def is_secret_expired():
try:
stat = os.stat(JWT_SECRET_PATH)
except FileNotFoundError:
st_mtime = 0.0
else:
st_mtime = stat.st_mtime
return (
datetime.now().timestamp() - st_mtime > SECRET_EXPIRATION_TTL.seconds
)
async def rotate_secret():
"""Load JWT secret from the configured file path."""
secret_path = Path(JWT_SECRET_PATH)
try:
logger.info(
"Rotating proxy auth secret",
)
stub_secret = secrets.token_bytes(32)
secret_path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
secret_path.touch(mode=0o600)
atomic_rewrite(
secret_path,
stub_secret,
uid=-1,
backup=str(JWT_SECRET_PATH_OLD),
permissions=0o600,
)
await check_run(["systemctl", "restart", PROXY_SERVICE_NAME])
except Exception as e:
logger.error("Got error while rotating the secret %s", e)
@lru_cache(1)
def load_secret_from_file() -> bytes:
"""Load JWT secret from the configured file path."""
try:
with open(JWT_SECRET_PATH, "rb") as f:
return f.read().strip()
except FileNotFoundError:
logger.error("JWT secret file not found at %s", JWT_SECRET_PATH)
raise
except Exception as e:
logger.error("Failed to read JWT secret: %s", e)
raise
def generate_token(username: str, docroot: str) -> str:
"""
Generate a JWT token for the given username and docroots.
Args:
username: The username for the token
docroot: document root paths the user has access to
Returns:
The JWT token string
"""
exp_time = datetime.utcnow() + DEFAULT_TOKEN_EXPIRATION
claims = {"exp": exp_time, "username": username, "site_path": docroot}
try:
token = jwt.encode(claims, load_secret_from_file(), algorithm="HS256")
return token
except Exception as e:
logger.error("Failed to generate JWT token: %s", e)
raise
async def create_auth_php_file(site, token: str, uid, gid: int) -> None:
"""
Create the auth.php file in the site's imunify-security directory.
Args:
site: WPSite instance
token: JWT token string
uid, gid: int used for file creation
"""
try:
data_dir = await get_data_dir(site)
auth_file_path = data_dir / "auth.php"
php_content = f""" '{token}',
);
"""
# Run the file write operation in a thread pool
await asyncio.to_thread(
write_plugin_data_file_atomically,
auth_file_path,
php_content,
uid,
gid,
)
logger.info(
"Created auth.php file for site %s at %s", site, auth_file_path
)
except Exception as e:
logger.error("Failed to create auth.php file for site %s: %s", site, e)
raise
async def setup_site_authentication(
site, user_info: pwd.struct_passwd
) -> None:
"""
Set up authentication for a site by creating JWT token and auth.php file.
Args:
site: WPSite instance
user_info: pwd.struct_passwd data
"""
try:
token = generate_token(user_info.pw_name, str(site.docroot))
await create_auth_php_file(
site, token, user_info.pw_uid, user_info.pw_gid
)
logger.info("Successfully set up authentication for site %s", site)
except Exception as e:
logger.error(
"Failed to set up authentication for site %s: %s", site, e
)
raise