# # Last Change: 2025-10-25 # import functools import traceback import logging import asyncio class AsyncOriented: """ AsyncOriented objects have the following properties: - They have an async startup and cleanup function, which should be called to signal the life time of each instance Note: whichever event loop is used during the startup function, is expected to run for the entire lifetime of this object. - They will track all their internal tasks if self.create_task is used, and will perform cleanup on tasks that are done, once the task counter has passed a certain number since the last cleanup, or the cleanup function is explicitly called. (The cleanup is done in a sync manner) Notable Fields / Methods: - self.log first call will evaluate it on a per instance level, so be sure you've set the below methods before use. (Most likley, these are static values in the child class's body) -- cls.LOG_NAME, cls.LOG_FMT, cls.LOG_LEVEL - self.loop -> The unchanging loop for the lifetime of this object - self.tracked_tasks -> tasks created and tracked by self.create_task - self.wrap_coro_to_log_exceptions() -> can be overriden to customize the debugging info Make sure u don't accidentally override these without calling super() - async self.startup() - async self.cleanup() """ TRACKED_TASKS_CLEANUP_AFTER = 64 LOG_NAME = __name__ LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]" LOG_LEVEL = logging.DEBUG @property def log( self ): if hasattr( self, '_log' ): return self._log log = logging.getLogger(self.LOG_NAME) #print(f"Creating new log for self: {self}, {log} {id(log)=} {log.handlers}") handler = logging.StreamHandler() formatter = logging.Formatter( *( [self.LOG_FMT] if isinstance(self.LOG_FMT, str) else self.LOG_FMT ) ) handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(self.LOG_LEVEL) self._log = log return log async def startup( self ): assert not hasattr(self, "loop"), f"This AsyncOriented object's startup has already been called once." self.loop = asyncio.get_running_loop() self.tracked_tasks = [] self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER async def cleanup( self ): self.garbage_collect_tasks() msg = "Object's lifetime is over, cleanup is called." for task in self.tracked_tasks: task.cancel( msg = msg ) await asyncio.gather( *self.tracked_tasks, return_exceptions = True ) self.garbage_collect_tasks() def wrap_coro_to_log_exceptions( self, coro, name = None ): """ This function wraps a corouting, catches the exception it might return, and logs it using self.log, The exact logic and logging system can be customized by overriding this function. """ repr = f"{coro} ({name})" if name else f"{coro}" async def wrapped(): try: return await wrapped.coro except asyncio.exceptions.CancelledError as e: self.log.debug( f"Task Canceled {repr}: {e}" ) raise except KeyboardInterrupt as e: self.log.debug( f"Task Interrupt {repr}: {e}" ) raise except: self.log.warning(f"Task Error {repr}:\n{traceback.format_exc()}" ) raise wrapped.coro = coro return wrapped() def create_task( self, coro, *, name = None, context = None, debug = True ): if debug: coro = self.wrap_coro_to_log_exceptions( coro, name = name ) task = self.loop.create_task( coro, name = name, context = context ) self.tracked_tasks.append( task ) if self.tracked_tasks_next_cleanup <= len(self.tracked_tasks): self.garbage_collect_tasks() return task def garbage_collect_tasks( self ): task_count = len(self.tracked_tasks) #For the log for t in list(self.tracked_tasks): if t.done(): self.tracked_tasks.remove(t) self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER + len(self.tracked_tasks) self.log.debug( f"Cleaned up {task_count - len(self.tracked_tasks)} of {task_count} tasks," f" next cleanup at: {self.tracked_tasks_next_cleanup}" ) #