From de2885d20db849bc66ec5f0586f990b8835748da Mon Sep 17 00:00:00 2001 From: Oracle Date: Mon, 1 Sep 2025 13:15:12 +0200 Subject: [PATCH] Fixed django_signal handler regression, bumped version and more error messages now --- asyncron/utils.py | 3 ++- asyncron/workers.py | 34 +++++++++++++++++++++++++--------- setup.py | 2 +- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/asyncron/utils.py b/asyncron/utils.py index ed6cc4d..84815ab 100644 --- a/asyncron/utils.py +++ b/asyncron/utils.py @@ -36,5 +36,6 @@ def ignore_on_db_error( f ): try: return await f( *args, **kwargs ) except OperationalError as e: - return + raise #return For DEV + return decorator diff --git a/asyncron/workers.py b/asyncron/workers.py index e8d0677..cc16dec 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -87,11 +87,27 @@ class AsyncronWorker: self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks self.work_loop_over = asyncio.Event() self.database_unreachable = False + self.all_tasks = [] if daemon: self.thread = threading.Thread( target = self.start ) self.thread.start() + def create_task( self, coro, *, silent = False, name = None, context = None ): + + if not silent: + origina_coro = coro + async def coro(): + try: return await origina_coro + except: + self.log.warning(f"[Asyncron] Task Error {origina_coro} {name}:\n{traceback.format_exc()}" ) + raise + + self.all_tasks.append( self.loop.create_task( coro(), name = name, context = context ) ) + + for task in list(self.all_tasks): + if task.done(): self.all_tasks.remove(task) + def start( self, is_robust = False ): assert not hasattr(self, "loop"), "This worker is already running!" from .models import Worker, Task, Trace @@ -101,8 +117,8 @@ class AsyncronWorker: asyncio.set_event_loop( self.loop ) #Fight over who's gonna be the master, prove your health in the process! - self.loop.create_task( retry_on_db_error(self.master_loop)() ) - main_task = self.loop.create_task( self.work_loop() ) + self.create_task( retry_on_db_error(self.master_loop)() ) + main_task = self.create_task( self.work_loop() ) time.sleep(0.3) #To avoid the django initialization warning! self.model.save() @@ -182,7 +198,7 @@ class AsyncronWorker: continue asyncio.run_coroutine_threadsafe( - self.start_task_now( task ), + self.start_task_now( task, reason = f"Change ({signal_name}) on {instance}" ), self.loop ) @@ -242,7 +258,7 @@ class AsyncronWorker: current_master = True if not self.clearing_dead_workers: - self.loop.create_task( ignore_on_db_error(self.clear_dead_workers)() ) + self.create_task( self.clear_dead_workers() ) await self.sync_tasks() await self.clear_orphaned_traces() @@ -360,7 +376,7 @@ class AsyncronWorker: except IntegrityError: count = 0 if not count: continue #Lost the race condition to another worker. - self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) + self.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) async def check_services( self ): from .models import Task, Trace @@ -386,16 +402,16 @@ class AsyncronWorker: await trace.asave() #self.running_service_tasks[task.id] = - self.loop.create_task( self.start_trace_on_time( trace ) ) + self.create_task( self.start_trace_on_time( trace ) ) await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None ) - async def start_task_now( self, task ): + async def start_task_now( self, task, reason = "" ): trace = task.new_trace() - trace.set_status( "S", f"Change ({signal_name}) on {instance}" ) + trace.set_status( "S", reason ) trace.scheduled_datetime = timezone.now() #So it runs instantly trace.worker_lock_id = self.model.id - self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) + self.create_task( self.start_trace_on_time( trace ) ) async def start_trace_on_time( self, trace ): from .models import Trace diff --git a/setup.py b/setup.py index b8fdef7..2e6182b 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='asyncron', - version='0.1.10', + version='0.1.10.1', packages=find_packages(), #include_package_data=True, # Include static files from MANIFEST.in install_requires=[