ÿØÿàJFIFÿþ ÿÛC       ÿÛC ÿÀÿÄÿÄ"#QrÿÄÿÄ&1!A"2qQaáÿÚ ?Øy,æ/3JæÝ¹È߲؋5êXw²±ÉyˆR”¾I0ó2—PI¾IÌÚiMö¯–þrìN&"KgX:Šíµ•nTJnLK„…@!‰-ý ùúmë;ºgµŒ&ó±hw’¯Õ@”Ü— 9ñ-ë.²1<yà‚¹ïQÐU„ہ?.’¦èûbß±©Ö«Âw*VŒ) `$‰bØÔŸ’ëXÖ-ËTÜíGÚ3ð«g Ÿ§¯—Jx„–’U/ÂÅv_s(Hÿ@TñJÑãõçn­‚!ÈgfbÓc­:él[ðQe 9ÀPLbÃãCµm[5¿ç'ªjglå‡Ûí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢SŸx?"¸¦ùY騐ÒOÈ q’`~~ÚtËU¹CڒêV  I1Áß_ÿÙ# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from dataclasses import dataclass from typing import Optional from xray import gettext as _ from .constants import fpm_stat_storage, fpm_reload_timeout from .exceptions import XRayError from .utils import dbm_storage, timestamp @dataclass class FPMReloadController: """ Class to control the frequency of FPM service reloading """ fpm_service: str @property def latest_reload(self) -> Optional[int]: """ Get the timestamp of latest reload of FPM service """ try: with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: try: return int(_fpm_reload_stat[self.fpm_service].decode()) except KeyError: return None except RuntimeError as e: raise XRayError( _('Failed to get the timestamp of latest FPM reload: %s') % str(e), flag='warning') def save_latest_reload(self): """ Save the timestamp of latest reload of FPM service """ try: # save timestamp of latest reload of corresponding service with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: _fpm_reload_stat[self.fpm_service] = str(timestamp()) except RuntimeError as e: raise XRayError( _('Failed to save the timestamp of latest FPM reload %s, but main operation executed') % str(e), flag='warning') def delta(self) -> float: """ Get the time delta between current time and saved timestamp """ try: # we store timestamp in seconds, # but timeout -- fpm_reload_timeout -- in minutes return (timestamp() - self.latest_reload) / 60 except (TypeError, XRayError): # 1 hour, # big enough for non-blocking flow in case of unexpected errors return 60.0 def restrict(self) -> bool: """ Check if FPM reload should be blocked """ return self.delta() <= fpm_reload_timeout