From 243c43f38eb58711141ec62dc5f7949d6885ec18 Mon Sep 17 00:00:00 2001 From: Oracle Date: Wed, 3 Sep 2025 19:20:24 +0200 Subject: [PATCH] Lots of changes, mostly about model_change signals --- asyncron/models.py | 13 ++++++++----- asyncron/workers.py | 22 +++++++++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/asyncron/models.py b/asyncron/models.py index ec2f151..2683903 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -178,10 +178,6 @@ class Trace( BaseModel ): self.returned = None self.stderr = "" self.stdout = "" - #Runtime Bits - self.loop = asyncio.get_running_loop() - self.new_print = asyncio.Event() - self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() ) try: func = Task.registered_tasks[self.task.name] @@ -193,6 +189,11 @@ class Trace( BaseModel ): self.set_status( "R" ) await self.asave() + #Runtime Bits + self.loop = asyncio.get_running_loop() + self.new_print = asyncio.Event() + self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() ) + try: async with asyncio.timeout( None ) as tmcm: @@ -217,7 +218,9 @@ class Trace( BaseModel ): self.set_status( "C" ) self.returned = output - self.commit_on_new_print_task.cancel() + finally: + self.commit_on_new_print_task.cancel() + self.last_end_datetime = timezone.now() await self.asave() diff --git a/asyncron/workers.py b/asyncron/workers.py index cc16dec..55e3e54 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -116,6 +116,10 @@ class AsyncronWorker: self.loop = asyncio.new_event_loop() asyncio.set_event_loop( self.loop ) + #Run tasks from other threads, safely + self.task_reason_jobs_queue = asyncio.Queue() + self.create_task( self.consume_task_reason_jobs_queue() ) + #Fight over who's gonna be the master, prove your health in the process! self.create_task( retry_on_db_error(self.master_loop)() ) main_task = self.create_task( self.work_loop() ) @@ -190,6 +194,10 @@ class AsyncronWorker: for name in self.watching_models[instance.__class__]: task = Task.registered_tasks[name].task + if task.trace_set.filter( status = "R" ).exists(): + #print("Will not run another trace of the same task to reduce the change of an infinite cycle.") + continue + if task.worker_type not in ("AR" if self.model.is_robust else "AD"): #If we can't run this trace asyncio.run_coroutine_threadsafe( task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ), @@ -197,8 +205,10 @@ class AsyncronWorker: ) continue + #print( threading.current_thread(), signal_name, sender, signal, instance, kwargs ) + asyncio.run_coroutine_threadsafe( - self.start_task_now( task, reason = f"Change ({signal_name}) on {instance}" ), + self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ), self.loop ) @@ -345,6 +355,11 @@ class AsyncronWorker: self.work_loop_over.set() + async def consume_task_reason_jobs_queue( self ): + while True: + task, reason = await self.task_reason_jobs_queue.get() + await self.start_task_now( task, reason ) + self.task_reason_jobs_queue.task_done() async def check_scheduled( self ): from .models import Task, Trace @@ -376,7 +391,7 @@ class AsyncronWorker: except IntegrityError: count = 0 if not count: continue #Lost the race condition to another worker. - self.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) + self.create_task( self.start_trace_on_time( trace ) ) async def check_services( self ): from .models import Task, Trace @@ -397,7 +412,7 @@ class AsyncronWorker: if not locked: continue trace = task.new_trace() - trace.status = "W" + trace.set_status( "W", "Waiting to start the service ASAP." ) trace.worker_lock = self.model await trace.asave() @@ -421,6 +436,7 @@ class AsyncronWorker: await trace.arefresh_from_db() await trace.start() + trace.worker_lock = None await trace.asave( update_fields = ['worker_lock'] )