lock raises error
This commit is contained in:
parent
429c694629
commit
0361926f32
146
asyncron/asynctools.py
Normal file
146
asyncron/asynctools.py
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
#
|
||||
# Last Change: 2025-10-25
|
||||
#
|
||||
import functools
|
||||
import traceback
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
class AsyncOriented:
|
||||
"""
|
||||
AsyncOriented objects have the following properties:
|
||||
|
||||
- They have an async startup and cleanup function,
|
||||
which should be called to signal the life time of each instance
|
||||
|
||||
Note: whichever event loop is used during the startup function, is expected to
|
||||
run for the entire lifetime of this object.
|
||||
|
||||
- They will track all their internal tasks if self.create_task is used,
|
||||
and will perform cleanup on tasks that are done,
|
||||
once the task counter has passed a certain number since the last cleanup,
|
||||
or the cleanup function is explicitly called. (The cleanup is done in a sync manner)
|
||||
|
||||
Notable Fields / Methods:
|
||||
|
||||
- self.log
|
||||
first call will evaluate it on a per instance level,
|
||||
so be sure you've set the below methods before use.
|
||||
(Most likley, these are static values in the child class's body)
|
||||
-- cls.LOG_NAME, cls.LOG_FMT, cls.LOG_LEVEL
|
||||
|
||||
- self.loop -> The unchanging loop for the lifetime of this object
|
||||
|
||||
- self.tracked_tasks -> tasks created and tracked by self.create_task
|
||||
- self.wrap_coro_to_log_exceptions() -> can be overriden to customize the debugging info
|
||||
|
||||
Make sure u don't accidentally override these without calling super()
|
||||
- async self.startup()
|
||||
- async self.cleanup()
|
||||
"""
|
||||
|
||||
TRACKED_TASKS_CLEANUP_AFTER = 64
|
||||
|
||||
LOG_NAME = __name__
|
||||
LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]"
|
||||
LOG_LEVEL = logging.DEBUG
|
||||
|
||||
@property
|
||||
def log( self ):
|
||||
if hasattr( self, '_log' ): return self._log
|
||||
|
||||
log = logging.getLogger(self.LOG_NAME)
|
||||
#print(f"Creating new log for self: {self}, {log} {id(log)=} {log.handlers}")
|
||||
|
||||
handler = logging.StreamHandler()
|
||||
formatter = logging.Formatter( *(
|
||||
[self.LOG_FMT]
|
||||
if isinstance(self.LOG_FMT, str) else
|
||||
self.LOG_FMT
|
||||
) )
|
||||
handler.setFormatter(formatter)
|
||||
log.addHandler(handler)
|
||||
log.setLevel(self.LOG_LEVEL)
|
||||
self._log = log
|
||||
return log
|
||||
|
||||
|
||||
|
||||
async def startup( self ):
|
||||
assert not hasattr(self, "loop"), f"This AsyncOriented object's startup has already been called once."
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.tracked_tasks = []
|
||||
self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER
|
||||
|
||||
async def cleanup( self ):
|
||||
self.garbage_collect_tasks()
|
||||
msg = "Object's lifetime is over, cleanup is called."
|
||||
for task in self.tracked_tasks:
|
||||
task.cancel( msg = msg )
|
||||
|
||||
await asyncio.gather( *self.tracked_tasks, return_exceptions = True )
|
||||
self.garbage_collect_tasks()
|
||||
|
||||
def wrap_coro_to_log_exceptions( self, coro, name = None ):
|
||||
"""
|
||||
This function wraps a corouting, catches the exception it might return,
|
||||
and logs it using self.log, The exact logic and logging system can be customized
|
||||
by overriding this function.
|
||||
"""
|
||||
|
||||
repr = f"{coro} ({name})" if name else f"{coro}"
|
||||
async def wrapped():
|
||||
try:
|
||||
return await wrapped.coro
|
||||
|
||||
except asyncio.exceptions.CancelledError as e:
|
||||
self.log.debug( f"Task Canceled {repr}: {e}" )
|
||||
raise
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
self.log.debug( f"Task Interrupt {repr}: {e}" )
|
||||
raise
|
||||
|
||||
except:
|
||||
self.log.warning(f"Task Error {repr}:\n{traceback.format_exc()}" )
|
||||
raise
|
||||
|
||||
wrapped.coro = coro
|
||||
return wrapped()
|
||||
|
||||
def create_task( self, coro, *, name = None, context = None, debug = True ):
|
||||
|
||||
if debug: coro = self.wrap_coro_to_log_exceptions( coro, name = name )
|
||||
|
||||
task = self.loop.create_task( coro, name = name, context = context )
|
||||
self.tracked_tasks.append( task )
|
||||
|
||||
if self.tracked_tasks_next_cleanup <= len(self.tracked_tasks):
|
||||
self.garbage_collect_tasks()
|
||||
|
||||
return task
|
||||
|
||||
|
||||
def garbage_collect_tasks( self ):
|
||||
|
||||
task_count = len(self.tracked_tasks) #For the log
|
||||
|
||||
for t in list(self.tracked_tasks):
|
||||
if t.done(): self.tracked_tasks.remove(t)
|
||||
|
||||
self.tracked_tasks_next_cleanup = self.TRACKED_TASKS_CLEANUP_AFTER + len(self.tracked_tasks)
|
||||
|
||||
self.log.debug(
|
||||
f"Cleaned up {task_count - len(self.tracked_tasks)} of {task_count} tasks,"
|
||||
f" next cleanup at: {self.tracked_tasks_next_cleanup}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#
|
||||
|
|
@ -486,10 +486,13 @@ class AsyncronWorker:
|
|||
trace = task.new_trace()
|
||||
await trace.reschedule( reason = "Auto Scheduled" )
|
||||
|
||||
try:
|
||||
locked = await Task.objects.filter( id = task.id ).filter(
|
||||
models.Q(worker_lock = None) |
|
||||
models.Q(worker_lock = self.model) #This is incase the lock has been aquired for some reason before.
|
||||
).aupdate( worker_lock = self.model )
|
||||
except IntegrityError:
|
||||
locked = False
|
||||
|
||||
if locked:
|
||||
await trace.asave()
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user