ÿØÿàJFIFÿþ ÿÛC       ÿÛC ÿÀÿÄÿÄ"#QrÿÄÿÄ&1!A"2qQaáÿÚ ?Øy,æ/3JæÝ¹È߲؋5êXw²±ÉyˆR”¾I0ó2—PI¾IÌÚiMö¯–þrìN&"KgX:Šíµ•nTJnLK„…@!‰-ý ùúmë;ºgµŒ&ó±hw’¯Õ@”Ü— 9ñ-ë.²1<yà‚¹ïQÐU„ہ?.’¦èûbß±©Ö«Âw*VŒ) `$‰bØÔŸ’ëXÖ-ËTÜíGÚ3ð«g Ÿ§¯—Jx„–’U/ÂÅv_s(Hÿ@TñJÑãõçn­‚!ÈgfbÓc­:él[ðQe 9ÀPLbÃãCµm[5¿ç'ªjglå‡Ûí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢SŸx?"¸¦ùY騐ÒOÈ q’`~~ÚtËU¹CڒêV  I1Áß_ÿÙ# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserv # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains X-ray objects that we pass to other projects (such as SSA) for reuse """ import json from typing import Optional from xray.manager import initialize_manager from xray.internal.exceptions import XRayError, XRayMissingDomain from xray.internal.nginx_utils import NginxUserCache from xray.internal.utils import read_sys_id from xray.internal.types import DomainInfo try: xray_manager = initialize_manager(read_sys_id()) except XRayError as e: xray_manager = None # Passing an object to start autotracing start_autotracing = xray_manager.start_autotracing if xray_manager else None def tasks_list(): """Returns list of current tasks""" try: return json.loads(xray_manager.tasks_list())['data']['result'] except (json.JSONDecodeError, AttributeError, KeyError, XRayError): return list() def domain_info(domain_name: str) -> Optional[DomainInfo]: """ Passing an object to retrieve information about given domain from control panel environment """ try: return xray_manager.get_domain_info(domain_name) except (AttributeError, XRayMissingDomain): # if there is no such domain name among the existing ones return None