hopefully fixed the long execution errors
This commit is contained in:
parent
9ab4e3ea8d
commit
c6ecc71517
|
|
@ -2,7 +2,7 @@ from django.apps import AppConfig
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.apps import apps
|
from django.apps import apps
|
||||||
|
|
||||||
import pathlib, importlib, types
|
import os, pathlib, importlib, types
|
||||||
|
|
||||||
|
|
||||||
class AsyncronConfig(AppConfig):
|
class AsyncronConfig(AppConfig):
|
||||||
|
|
@ -16,6 +16,9 @@ class AsyncronConfig(AppConfig):
|
||||||
except (KeyError, AttributeError): pass
|
except (KeyError, AttributeError): pass
|
||||||
else: self.import_per_app( names )
|
else: self.import_per_app( names )
|
||||||
|
|
||||||
|
#if settings.DEBUG:
|
||||||
|
# os.environ['PYTHONASYNCIODEBUG'] = "1"
|
||||||
|
|
||||||
self.load_extensions()
|
self.load_extensions()
|
||||||
|
|
||||||
#Init the asyncron worker for this process
|
#Init the asyncron worker for this process
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,12 @@
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.db.models.constraints import UniqueConstraint, Q
|
from django.db.models.constraints import UniqueConstraint, Q
|
||||||
from unittest.mock import patch #to mock print, can't use redirect_stdout in async code
|
|
||||||
|
#To mock print, can't use redirect_stdout in async code
|
||||||
|
#This is buggy, has print leakage and still not very good,
|
||||||
|
#But better than nothing when a task is not self_aware or calls something that isn't
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
import functools, traceback, io
|
import functools, traceback, io
|
||||||
import random
|
import random
|
||||||
|
|
@ -173,6 +178,10 @@ class Trace( BaseModel ):
|
||||||
self.returned = None
|
self.returned = None
|
||||||
self.stderr = ""
|
self.stderr = ""
|
||||||
self.stdout = ""
|
self.stdout = ""
|
||||||
|
#Runtime Bits
|
||||||
|
self.loop = asyncio.get_running_loop()
|
||||||
|
self.new_print = asyncio.Event()
|
||||||
|
self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() )
|
||||||
|
|
||||||
try:
|
try:
|
||||||
func = Task.registered_tasks[self.task.name]
|
func = Task.registered_tasks[self.task.name]
|
||||||
|
|
@ -184,27 +193,12 @@ class Trace( BaseModel ):
|
||||||
finally:
|
finally:
|
||||||
await self.asave()
|
await self.asave()
|
||||||
|
|
||||||
if self.task.self_aware:
|
|
||||||
func = functools.partial(func, self)
|
|
||||||
|
|
||||||
#Create an io object to read the print output
|
|
||||||
new_print = asyncio.Event() #TODO: So we can update the db mid task in an async way
|
|
||||||
|
|
||||||
def confined_print( *args, sep = " ", end = "\n", **kwargs ):
|
|
||||||
self.stdout += sep.join( str(i) for i in args ) + end
|
|
||||||
new_print.set()
|
|
||||||
|
|
||||||
async def confined_print_flush():
|
|
||||||
while True:
|
|
||||||
await new_print.wait()
|
|
||||||
await self.asave( update_fields = ['stdout'] )
|
|
||||||
new_print.clear()
|
|
||||||
|
|
||||||
confined_print_flush_task = asyncio.create_task( confined_print_flush() )
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with patch( 'builtins.print', confined_print ):
|
if self.task.self_aware:
|
||||||
output = await func( *self.args, **self.kwargs )
|
output = await func(self, *self.args, **self.kwargs )
|
||||||
|
else:
|
||||||
|
with patch( 'builtins.print', self.print ):
|
||||||
|
output = await func( *self.args, **self.kwargs )
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.set_status( "E", f"Exception: {e}" )
|
self.set_status( "E", f"Exception: {e}" )
|
||||||
|
|
@ -215,11 +209,22 @@ class Trace( BaseModel ):
|
||||||
self.returned = output
|
self.returned = output
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
confined_print_flush_task.cancel()
|
self.commit_on_new_print_task.cancel()
|
||||||
self.last_end_datetime = timezone.now()
|
self.last_end_datetime = timezone.now()
|
||||||
await self.asave()
|
await self.asave()
|
||||||
|
|
||||||
|
|
||||||
|
### Runtime methods for self aware tasks
|
||||||
|
async def commit_on_new_print( self ):
|
||||||
|
while True:
|
||||||
|
await self.new_print.wait()
|
||||||
|
await self.asave( update_fields = ['stdout'] )
|
||||||
|
self.new_print.clear()
|
||||||
|
|
||||||
|
def print( self, *args, sep = " ", end = "\n", file = None, flush = True ): #We get 'file' here to fool tasks that aren't self_aware
|
||||||
|
assert hasattr(self, 'commit_on_new_print_task'), "trace.print needs to be called while a trace.commit_on_new_print task is active!"
|
||||||
|
self.stdout += sep.join( str(i) for i in args ) + end
|
||||||
|
if flush: self.new_print.set()
|
||||||
|
|
||||||
|
|
||||||
#https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/
|
#https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/
|
||||||
|
|
|
||||||
|
|
@ -117,8 +117,9 @@ class AsyncronWorker:
|
||||||
self.attach_django_signals()
|
self.attach_django_signals()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.loop.run_forever() #This is the lifetime of this worker
|
self.loop.run_until_complete( self.work_loop_over.wait() ) #This is the lifetime of this worker
|
||||||
except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...")
|
except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...")
|
||||||
|
except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...")
|
||||||
else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...")
|
else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...")
|
||||||
|
|
||||||
self.loop.run_until_complete( self.graceful_shutdown() )
|
self.loop.run_until_complete( self.graceful_shutdown() )
|
||||||
|
|
@ -266,25 +267,36 @@ class AsyncronWorker:
|
||||||
|
|
||||||
|
|
||||||
async def work_loop( self ):
|
async def work_loop( self ):
|
||||||
|
from .models import Worker, Task, Trace
|
||||||
|
|
||||||
self.check_interval = 0
|
self.check_interval = 0
|
||||||
|
|
||||||
while True:
|
while await Worker.objects.filter( id = self.model.id ).aexists():
|
||||||
|
|
||||||
await asyncio.sleep( self.check_interval )
|
await asyncio.sleep( self.check_interval )
|
||||||
self.check_interval = 10
|
self.check_interval = 10
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.check_services()
|
await self.check_services()
|
||||||
await self.check_scheduled()
|
await self.check_scheduled()
|
||||||
await sync_to_async( close_old_connections )()
|
|
||||||
except OperationalError as e:
|
except OperationalError as e:
|
||||||
self.log.warning(f"[Asyncron] DB Connection Error: {e}")
|
self.log.warning(f"[Asyncron] DB Connection Error: {e}")
|
||||||
print( traceback.format_exc() )
|
print( traceback.format_exc() )
|
||||||
break
|
self.check_interval = 60 #break
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.warning(f"[Asyncron] check_scheduled failed: {e}")
|
self.log.warning(f"[Asyncron] check_scheduled failed: {e}")
|
||||||
print( traceback.format_exc() )
|
print( traceback.format_exc() )
|
||||||
self.check_interval = 20
|
self.check_interval = 20
|
||||||
|
|
||||||
|
try:
|
||||||
|
await sync_to_async( close_old_connections )()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(f"[Asyncron] close_old_connections failed: {e}")
|
||||||
|
print( traceback.format_exc() )
|
||||||
|
#break
|
||||||
|
|
||||||
|
|
||||||
self.work_loop_over.set()
|
self.work_loop_over.set()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -314,7 +326,8 @@ class AsyncronWorker:
|
||||||
await trace.eval_related()
|
await trace.eval_related()
|
||||||
#print(f"Checking {trace} to do now: {trace.scheduled_datetime - timezone.now()}")
|
#print(f"Checking {trace} to do now: {trace.scheduled_datetime - timezone.now()}")
|
||||||
|
|
||||||
count = await Trace.objects.filter( id = trace.id, status = "S" ).aupdate( status = "W", worker_lock = self.model )
|
try: count = await Trace.objects.filter( id = trace.id, status = "S" ).aupdate( status = "W", worker_lock = self.model )
|
||||||
|
except IntegrityError: count = 0
|
||||||
if not count: continue #Lost the race condition to another worker.
|
if not count: continue #Lost the race condition to another worker.
|
||||||
|
|
||||||
self.loop.create_task( self.start_trace_on_time( trace ) )
|
self.loop.create_task( self.start_trace_on_time( trace ) )
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user