diff --git a/python/asynctools.py b/python/asynctools.py new file mode 100644 index 0000000..4234fa0 --- /dev/null +++ b/python/asynctools.py @@ -0,0 +1,146 @@ +# +# Last Change: 2025-10-25 +# +import functools +import traceback +import logging +import asyncio + +class AsyncOriented: + """ + AsyncOriented objects have the following properties: + + - They have an async startup and cleanup function, + which should be called to signal the life time of each instance + + Note: whichever event loop is used during the startup function, is expected to + run for the entire lifetime of this object. + + - They will track all their internal tasks if self.create_task is used, + and will perform cleanup on tasks that are done, + once the task counter has passed a certain number since the last cleanup, + or the cleanup function is explicitly called. (The cleanup is done in a sync manner) + + Notable Fields / Methods: + + - self.log + first call will evaluate it on a per instance level, + so be sure you've set the below methods before use. + (Most likley, these are static values in the child class's body) + -- cls.LOG_NAME, cls.LOG_FMT, cls.LOG_LEVEL + + - self.loop -> The unchanging loop for the lifetime of this object + + - self.tracked_tasks -> tasks created and tracked by self.create_task + - self.wrap_coro_to_log_exceptions() -> can be overriden to customize the debugging info + + Make sure u don't accidentally override these without calling super() + - async self.startup() + - async self.cleanup() + """ + + TRACKED_TASKS_CLEANUP_AFTER = 64 + + LOG_NAME = __name__ + LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]" + LOG_LEVEL = logging.DEBUG + + @property + def log( self ): + if hasattr( self, '_log' ): return self._log + + log = logging.getLogger(self.LOG_NAME) + #print(f"Creating new log for self: {self}, {log} {id(log)=} {log.handlers}") + + handler = logging.StreamHandler() + formatter = logging.Formatter( *( + [self.LOG_FMT] + if isinstance(self.LOG_FMT, str) else + self.LOG_FMT + ) ) + handler.setFormatter(formatter) + log.addHandler(handler) + log.setLevel(self.LOG_LEVEL) + self._log = log + return log + + + + async def startup( self ): + assert not hasattr(self, "loop"), f"This AsyncOriented object's startup has already been called once." + self.loop = asyncio.get_running_loop() + self.tracked_tasks = [] + self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER + + async def cleanup( self ): + self.garbage_collect_tasks() + msg = "Object's lifetime is over, cleanup is called." + for task in self.tracked_tasks: + task.cancel( msg = msg ) + + await asyncio.gather( *self.tracked_tasks, return_exceptions = True ) + self.garbage_collect_tasks() + + def wrap_coro_to_log_exceptions( self, coro, name = None ): + """ + This function wraps a corouting, catches the exception it might return, + and logs it using self.log, The exact logic and logging system can be customized + by overriding this function. + """ + + repr = f"{coro} ({name})" if name else f"{coro}" + async def wrapped(): + try: + return await wrapped.coro + + except asyncio.exceptions.CancelledError as e: + self.log.debug( f"Task Canceled {repr}: {e}" ) + raise + + except KeyboardInterrupt as e: + self.log.debug( f"Task Interrupt {repr}: {e}" ) + raise + + except: + self.log.warning(f"Task Error {repr}:\n{traceback.format_exc()}" ) + raise + + wrapped.coro = coro + return wrapped() + + def create_task( self, coro, *, name = None, context = None, debug = True ): + + if debug: coro = self.wrap_coro_to_log_exceptions( coro, name = name ) + + task = self.loop.create_task( coro, name = name, context = context ) + self.tracked_tasks.append( task ) + + if self.tracked_tasks_next_cleanup <= len(self.tracked_tasks): + self.garbage_collect_tasks() + + return task + + + def garbage_collect_tasks( self ): + + task_count = len(self.tracked_tasks) #For the log + + for t in list(self.tracked_tasks): + if t.done(): self.tracked_tasks.remove(t) + + self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER + len(self.tracked_tasks) + + self.log.debug( + f"Cleaned up {task_count - len(self.tracked_tasks)} of {task_count} tasks," + f" next cleanup at: {self.tracked_tasks_next_cleanup}" + ) + + + + + + + + + +#