diff --git a/asyncron/__init__.py b/asyncron/__init__.py index 526d263..d0100b3 100644 --- a/asyncron/__init__.py +++ b/asyncron/__init__.py @@ -1,4 +1,4 @@ -from .shortcuts import task, run_on_model_change, extension +from .shortcuts import task, service, run_on_model_change, extension __all__ = [ 'task', diff --git a/asyncron/admin.py b/asyncron/admin.py index df81315..bad4a71 100644 --- a/asyncron/admin.py +++ b/asyncron/admin.py @@ -40,7 +40,7 @@ class WorkerAdmin( BaseModelAdmin ): class TaskAdmin( BaseModelAdmin ): order = 1 list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged_executions', 'last_execution', 'scheduled' - fields = ["name", "description", "type", "on_model_change", "jitter"] + fields = ["name", "description", "type", "on_model_change", "jitter", "self_aware"] actions = 'schedule_execution', 'execution_now', 'delete_script_missing' def has_add_permission( self, request, obj = None ): return False diff --git a/asyncron/migrations/0002_task_self_aware.py b/asyncron/migrations/0002_task_self_aware.py new file mode 100644 index 0000000..8e35e49 --- /dev/null +++ b/asyncron/migrations/0002_task_self_aware.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.2 on 2025-01-21 22:21 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('asyncron', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='self_aware', + field=models.BooleanField(default=False, help_text="Whether It's first argument is 'self', being a trace instance."), + ), + ] diff --git a/asyncron/models.py b/asyncron/models.py index bb0d74f..1f8dcc2 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -55,6 +55,7 @@ class Task( BaseModel ): null = True, blank = True ) #None will mean it's a "service" like task gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) ) + self_aware = models.BooleanField( default = False, help_text = "Whether It's first argument is 'self', being a trace instance." ) #Periodic Tasks interval = models.DurationField( null = True, blank = True ) @@ -164,7 +165,6 @@ class Trace( BaseModel ): async def start( self ): - #assert self.status == "W", f"Cannot start a task that is not Waiting ({self.get_status_display()})." await self.eval_related('task') assert self.status in "SPAWE", f"Cannot start a task that is in {self.get_status_display()} state!" @@ -184,13 +184,23 @@ class Trace( BaseModel ): finally: await self.asave() + if self.task.self_aware: + func = functools.partial(func, self) #Create an io object to read the print output - new_lines = asyncio.Event() #TODO: So we can update the db mid task in an async way + new_print = asyncio.Event() #TODO: So we can update the db mid task in an async way def confined_print( *args, sep = " ", end = "\n", **kwargs ): self.stdout += sep.join( str(i) for i in args ) + end - new_lines.set() + new_print.set() + + async def confined_print_flush(): + while True: + await new_print.wait() + await self.asave( update_fields = ['stdout'] ) + new_print.clear() + + confined_print_flush_task = asyncio.create_task( confined_print_flush() ) try: with patch( 'builtins.print', confined_print ): @@ -205,18 +215,11 @@ class Trace( BaseModel ): self.returned = output finally: + confined_print_flush_task.cancel() self.last_end_datetime = timezone.now() await self.asave() - #TODO: Cool stuff to add later - #callee = models.TextField() - #caller = models.TextField() - #repeatable = models.BooleanField( default = True ) #Unless something in args or kwargs is unserializable! - #tags = models.JSONField( default = list ) - # - - #https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/ diff --git a/asyncron/shortcuts.py b/asyncron/shortcuts.py index 8204901..8b5a4a7 100644 --- a/asyncron/shortcuts.py +++ b/asyncron/shortcuts.py @@ -46,6 +46,12 @@ def task( *args, **kwargs ): return Task( *args, **kwargs ).register +def service( *args, **kwargs ): + kwargs.setdefault( 'timeout', None ) + kwargs.setdefault( 'worker_type', "R" ) + kwargs.setdefault( 'self_aware', True ) + return task( *args, **kwargs ) + def run_on_model_change( *models ): models = [ diff --git a/asyncron/workers.py b/asyncron/workers.py index d937bc2..b2f1872 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -272,6 +272,7 @@ class AsyncronWorker: await asyncio.sleep( self.check_interval ) self.check_interval = 10 try: + await self.check_services() await self.check_scheduled() await sync_to_async( close_old_connections )() except OperationalError as e: @@ -321,8 +322,10 @@ class AsyncronWorker: async def start_trace_on_time( self, trace ): from .models import Trace - await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() ) - await trace.arefresh_from_db() + if trace.task.timeout: #If this is from a periodic task + await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() ) + await trace.arefresh_from_db() + await trace.start() trace.worker_lock = None await trace.asave( update_fields = ['worker_lock'] ) @@ -337,3 +340,29 @@ class AsyncronWorker: await QuerySet.exclude( id__in = QuerySet[:max_count].values_list( 'id', flat = True ) ).adelete() + + + async def check_services( self ): + from .models import Task, Trace + + #start services that aren't running yet. + Ts = Task.objects.filter( interval = None ).exclude( + trace__status__in = "WR" + ).exclude( worker_type = "D" if self.model.is_robust else "R" ) + + async for task in Ts: + + locked = await Task.objects.filter( id = task.id ).filter( + models.Q(worker_lock = None) | + models.Q(worker_lock = self.model) #This is incase the lock has been aquired for some reason before. + ).aupdate( worker_lock = self.model ) + if not locked: continue + + trace = task.new_trace() + trace.status = "W" + trace.worker_lock = self.model + await trace.asave() + + #self.running_service_tasks[task.id] = + self.loop.create_task( self.start_trace_on_time( trace ) ) + await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )