diff --git a/asyncron/apps.py b/asyncron/apps.py index e6b2388..4d00a2e 100644 --- a/asyncron/apps.py +++ b/asyncron/apps.py @@ -2,7 +2,7 @@ from django.apps import AppConfig from django.conf import settings from django.apps import apps -import pathlib, importlib, types +import os, pathlib, importlib, types class AsyncronConfig(AppConfig): @@ -16,6 +16,9 @@ class AsyncronConfig(AppConfig): except (KeyError, AttributeError): pass else: self.import_per_app( names ) + #if settings.DEBUG: + # os.environ['PYTHONASYNCIODEBUG'] = "1" + self.load_extensions() #Init the asyncron worker for this process diff --git a/asyncron/models.py b/asyncron/models.py index 1f8dcc2..aef5153 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -1,7 +1,12 @@ from django.utils import timezone from django.db import models from django.db.models.constraints import UniqueConstraint, Q -from unittest.mock import patch #to mock print, can't use redirect_stdout in async code + +#To mock print, can't use redirect_stdout in async code +#This is buggy, has print leakage and still not very good, +#But better than nothing when a task is not self_aware or calls something that isn't +from unittest.mock import patch + import functools, traceback, io import random @@ -173,6 +178,10 @@ class Trace( BaseModel ): self.returned = None self.stderr = "" self.stdout = "" + #Runtime Bits + self.loop = asyncio.get_running_loop() + self.new_print = asyncio.Event() + self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() ) try: func = Task.registered_tasks[self.task.name] @@ -184,42 +193,38 @@ class Trace( BaseModel ): finally: await self.asave() - if self.task.self_aware: - func = functools.partial(func, self) - - #Create an io object to read the print output - new_print = asyncio.Event() #TODO: So we can update the db mid task in an async way - - def confined_print( *args, sep = " ", end = "\n", **kwargs ): - self.stdout += sep.join( str(i) for i in args ) + end - new_print.set() - - async def confined_print_flush(): - while True: - await new_print.wait() - await self.asave( update_fields = ['stdout'] ) - new_print.clear() - - confined_print_flush_task = asyncio.create_task( confined_print_flush() ) - try: - with patch( 'builtins.print', confined_print ): - output = await func( *self.args, **self.kwargs ) + if self.task.self_aware: + output = await func(self, *self.args, **self.kwargs ) + else: + with patch( 'builtins.print', self.print ): + output = await func( *self.args, **self.kwargs ) except Exception as e: self.set_status( "E", f"Exception: {e}" ) self.stderr = traceback.format_exc() - + else: self.set_status( "C" ) self.returned = output finally: - confined_print_flush_task.cancel() + self.commit_on_new_print_task.cancel() self.last_end_datetime = timezone.now() await self.asave() + ### Runtime methods for self aware tasks + async def commit_on_new_print( self ): + while True: + await self.new_print.wait() + await self.asave( update_fields = ['stdout'] ) + self.new_print.clear() + + def print( self, *args, sep = " ", end = "\n", file = None, flush = True ): #We get 'file' here to fool tasks that aren't self_aware + assert hasattr(self, 'commit_on_new_print_task'), "trace.print needs to be called while a trace.commit_on_new_print task is active!" + self.stdout += sep.join( str(i) for i in args ) + end + if flush: self.new_print.set() #https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/ diff --git a/asyncron/workers.py b/asyncron/workers.py index b2f1872..09b16bd 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -117,8 +117,9 @@ class AsyncronWorker: self.attach_django_signals() try: - self.loop.run_forever() #This is the lifetime of this worker + self.loop.run_until_complete( self.work_loop_over.wait() ) #This is the lifetime of this worker except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...") + except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...") else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...") self.loop.run_until_complete( self.graceful_shutdown() ) @@ -266,25 +267,36 @@ class AsyncronWorker: async def work_loop( self ): + from .models import Worker, Task, Trace + self.check_interval = 0 - while True: + while await Worker.objects.filter( id = self.model.id ).aexists(): + await asyncio.sleep( self.check_interval ) self.check_interval = 10 + try: await self.check_services() await self.check_scheduled() - await sync_to_async( close_old_connections )() except OperationalError as e: self.log.warning(f"[Asyncron] DB Connection Error: {e}") print( traceback.format_exc() ) - break + self.check_interval = 60 #break except Exception as e: self.log.warning(f"[Asyncron] check_scheduled failed: {e}") print( traceback.format_exc() ) self.check_interval = 20 + try: + await sync_to_async( close_old_connections )() + except Exception as e: + self.log.warning(f"[Asyncron] close_old_connections failed: {e}") + print( traceback.format_exc() ) + #break + + self.work_loop_over.set() @@ -314,7 +326,8 @@ class AsyncronWorker: await trace.eval_related() #print(f"Checking {trace} to do now: {trace.scheduled_datetime - timezone.now()}") - count = await Trace.objects.filter( id = trace.id, status = "S" ).aupdate( status = "W", worker_lock = self.model ) + try: count = await Trace.objects.filter( id = trace.id, status = "S" ).aupdate( status = "W", worker_lock = self.model ) + except IntegrityError: count = 0 if not count: continue #Lost the race condition to another worker. self.loop.create_task( self.start_trace_on_time( trace ) )