ÿØÿàJFIFÿþ ÿÛC       ÿÛC ÿÀÿÄÿÄ"#QrÿÄÿÄ&1!A"2qQaáÿÚ ?Øy,æ/3JæÝ¹È߲؋5êXw²±ÉyˆR”¾I0ó2—PI¾IÌÚiMö¯–þrìN&"KgX:Šíµ•nTJnLK„…@!‰-ý ùúmë;ºgµŒ&ó±hw’¯Õ@”Ü— 9ñ-ë.²1<yà‚¹ïQÐU„ہ?.’¦èûbß±©Ö«Âw*VŒ) `$‰bØÔŸ’ëXÖ-ËTÜíGÚ3ð«g Ÿ§¯—Jx„–’U/ÂÅv_s(Hÿ@TñJÑãõçn­‚!ÈgfbÓc­:él[ðQe 9ÀPLbÃãCµm[5¿ç'ªjglå‡Ûí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢SŸx?"¸¦ùY騐ÒOÈ q’`~~ÚtËU¹CڒêV  I1Áß_ÿÙ# -*- coding: utf-8 -*- import logging # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import queue from concurrent.futures import ThreadPoolExecutor, Future from threading import BoundedSemaphore logger = logging.getLogger(__name__) class BoundedThreadExecutor: """ BoundedExecutor behaves as a ThreadPoolExecutor which will block on calls to submit() once the limit given as "bound" work items are queued for execution. While processing incoming connections, we need two things: - the server must be able to process incoming connections in multiple threads (one for each client) because we spend some time trying to redirect incoming data to our servers - the server must have some limit in order not to create new threads indefinitely This executor has two limits: :param max_workers - which limits number of simultaneously running threads :param maxqueuesize - which limits number of tasks to wait for the available thread When both max_workers and maxqueuesize overflow, .submit function raises queue.Full exception. """ def __init__(self, maxqueuesize, max_workers): self.executor = ThreadPoolExecutor(max_workers=max_workers) self.semaphore = BoundedSemaphore(maxqueuesize + max_workers) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() def submit(self, fn, *args, **kwargs): success = self.semaphore.acquire(blocking=False) if not success: raise queue.Full try: future = self.executor.submit(fn, *args, **kwargs) except: self.semaphore.release() raise else: future.add_done_callback(self._on_future_complete) return future def shutdown(self, wait=True): self.executor.shutdown(wait) def _on_future_complete(self, x: Future): self.semaphore.release() if x.exception(): logger.exception('Future completed with exception')