From 8fe8d5ec8c943d1503c99905e0d58eb72e63fe13 Mon Sep 17 00:00:00 2001 From: Oracle Date: Mon, 5 May 2025 10:45:29 +0200 Subject: [PATCH] Improved quick execution to be instant when the current worker supports it --- asyncron/models.py | 8 +++--- asyncron/workers.py | 67 +++++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/asyncron/models.py b/asyncron/models.py index 029a617..196ea8a 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -187,11 +187,11 @@ class Trace( BaseModel ): func = Task.registered_tasks[self.task.name] except KeyError: self.set_status( "E", "Script Missing!" ) - return - else: - self.set_status( "R" ) - finally: await self.asave() + return + + self.set_status( "R" ) + await self.asave() try: if self.task.self_aware: diff --git a/asyncron/workers.py b/asyncron/workers.py index b9bbe0d..2eccf8b 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -171,11 +171,23 @@ class AsyncronWorker: def model_changed( self, signal_name, sender, signal, instance, **kwargs ): from .models import Task for name in self.watching_models[instance.__class__]: + task = Task.registered_tasks[name].task + + if task.worker_type not in ("AR" if self.model.is_robust else "AD"): #If we can't run this trace + asyncio.run_coroutine_threadsafe( + task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ), + self.loop + ) + continue + asyncio.run_coroutine_threadsafe( - Task.registered_tasks[name].task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ), + self.start_task_now( task ), self.loop ) + + + async def graceful_shutdown( self ): try: for attempt in range( 10 ): @@ -335,29 +347,6 @@ class AsyncronWorker: self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) - async def start_trace_on_time( self, trace ): - from .models import Trace - - if trace.task.timeout: #If this is from a periodic task - await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() ) - await trace.arefresh_from_db() - - await trace.start() - trace.worker_lock = None - await trace.asave( update_fields = ['worker_lock'] ) - - #Traces for the same task that we are done with (Completed, Aborted, Errored) - QuerySet = Trace.objects.filter( - task_id = trace.task_id, status__in = "CAE", protected = False, worker_lock = None - ).order_by('-register_datetime') - - #Should be deleted after the threashold - max_count = trace.task.max_completed_traces if trace.status == "C" else trace.task.max_failed_traces - await QuerySet.exclude( - id__in = QuerySet[:max_count].values_list( 'id', flat = True ) - ).adelete() - - async def check_services( self ): from .models import Task, Trace @@ -382,3 +371,33 @@ class AsyncronWorker: #self.running_service_tasks[task.id] = self.loop.create_task( self.start_trace_on_time( trace ) ) await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None ) + + async def start_task_now( self, task ): + trace = task.new_trace() + trace.set_status( "S", f"Change ({signal_name}) on {instance}" ) + trace.scheduled_datetime = timezone.now() #So it runs instantly + trace.worker_lock_id = self.model.id + + self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) + + async def start_trace_on_time( self, trace ): + from .models import Trace + + if trace.scheduled_datetime and timezone.now() < trace.scheduled_datetime: #If this is a periodic task + await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() ) + await trace.arefresh_from_db() + + await trace.start() + trace.worker_lock = None + await trace.asave( update_fields = ['worker_lock'] ) + + #Traces for the same task that we are done with (Completed, Aborted, Errored) + QuerySet = Trace.objects.filter( + task_id = trace.task_id, status__in = "CAE", protected = False, worker_lock = None + ).order_by('-register_datetime') + + #Should be deleted after the threashold + max_count = trace.task.max_completed_traces if trace.status == "C" else trace.task.max_failed_traces + await QuerySet.exclude( + id__in = QuerySet[:max_count].values_list( 'id', flat = True ) + ).adelete()