147 lines
4.0 KiB
Python
147 lines
4.0 KiB
Python
#
|
|
# Last Change: 2025-10-25
|
|
#
|
|
import functools
|
|
import traceback
|
|
import logging
|
|
import asyncio
|
|
|
|
class AsyncOriented:
|
|
"""
|
|
AsyncOriented objects have the following properties:
|
|
|
|
- They have an async startup and cleanup function,
|
|
which should be called to signal the life time of each instance
|
|
|
|
Note: whichever event loop is used during the startup function, is expected to
|
|
run for the entire lifetime of this object.
|
|
|
|
- They will track all their internal tasks if self.create_task is used,
|
|
and will perform cleanup on tasks that are done,
|
|
once the task counter has passed a certain number since the last cleanup,
|
|
or the cleanup function is explicitly called. (The cleanup is done in a sync manner)
|
|
|
|
Notable Fields / Methods:
|
|
|
|
- self.log
|
|
first call will evaluate it on a per instance level,
|
|
so be sure you've set the below methods before use.
|
|
(Most likley, these are static values in the child class's body)
|
|
-- cls.LOG_NAME, cls.LOG_FMT, cls.LOG_LEVEL
|
|
|
|
- self.loop -> The unchanging loop for the lifetime of this object
|
|
|
|
- self.tracked_tasks -> tasks created and tracked by self.create_task
|
|
- self.wrap_coro_to_log_exceptions() -> can be overriden to customize the debugging info
|
|
|
|
Make sure u don't accidentally override these without calling super()
|
|
- async self.startup()
|
|
- async self.cleanup()
|
|
"""
|
|
|
|
TRACKED_TASKS_CLEANUP_AFTER = 64
|
|
|
|
LOG_NAME = __name__
|
|
LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]"
|
|
LOG_LEVEL = logging.DEBUG
|
|
|
|
@property
|
|
def log( self ):
|
|
if hasattr( self, '_log' ): return self._log
|
|
|
|
log = logging.getLogger(self.LOG_NAME)
|
|
#print(f"Creating new log for self: {self}, {log} {id(log)=} {log.handlers}")
|
|
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter( *(
|
|
[self.LOG_FMT]
|
|
if isinstance(self.LOG_FMT, str) else
|
|
self.LOG_FMT
|
|
) )
|
|
handler.setFormatter(formatter)
|
|
log.addHandler(handler)
|
|
log.setLevel(self.LOG_LEVEL)
|
|
self._log = log
|
|
return log
|
|
|
|
|
|
|
|
async def startup( self ):
|
|
assert not hasattr(self, "loop"), f"This AsyncOriented object's startup has already been called once."
|
|
self.loop = asyncio.get_running_loop()
|
|
self.tracked_tasks = []
|
|
self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER
|
|
|
|
async def cleanup( self ):
|
|
self.garbage_collect_tasks()
|
|
msg = "Object's lifetime is over, cleanup is called."
|
|
for task in self.tracked_tasks:
|
|
task.cancel( msg = msg )
|
|
|
|
await asyncio.gather( *self.tracked_tasks, return_exceptions = True )
|
|
self.garbage_collect_tasks()
|
|
|
|
def wrap_coro_to_log_exceptions( self, coro, name = None ):
|
|
"""
|
|
This function wraps a corouting, catches the exception it might return,
|
|
and logs it using self.log, The exact logic and logging system can be customized
|
|
by overriding this function.
|
|
"""
|
|
|
|
repr = f"{coro} ({name})" if name else f"{coro}"
|
|
async def wrapped():
|
|
try:
|
|
return await wrapped.coro
|
|
|
|
except asyncio.exceptions.CancelledError as e:
|
|
self.log.debug( f"Task Canceled {repr}: {e}" )
|
|
raise
|
|
|
|
except KeyboardInterrupt as e:
|
|
self.log.debug( f"Task Interrupt {repr}: {e}" )
|
|
raise
|
|
|
|
except:
|
|
self.log.warning(f"Task Error {repr}:\n{traceback.format_exc()}" )
|
|
raise
|
|
|
|
wrapped.coro = coro
|
|
return wrapped()
|
|
|
|
def create_task( self, coro, *, name = None, context = None, debug = True ):
|
|
|
|
if debug: coro = self.wrap_coro_to_log_exceptions( coro, name = name )
|
|
|
|
task = self.loop.create_task( coro, name = name, context = context )
|
|
self.tracked_tasks.append( task )
|
|
|
|
if self.tracked_tasks_next_cleanup <= len(self.tracked_tasks):
|
|
self.garbage_collect_tasks()
|
|
|
|
return task
|
|
|
|
|
|
def garbage_collect_tasks( self ):
|
|
|
|
task_count = len(self.tracked_tasks) #For the log
|
|
|
|
for t in list(self.tracked_tasks):
|
|
if t.done(): self.tracked_tasks.remove(t)
|
|
|
|
self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER + len(self.tracked_tasks)
|
|
|
|
self.log.debug(
|
|
f"Cleaned up {task_count - len(self.tracked_tasks)} of {task_count} tasks,"
|
|
f" next cleanup at: {self.tracked_tasks_next_cleanup}"
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#
|