Improved quick execution to be instant when the current worker supports it
This commit is contained in:
parent
155dac5fe8
commit
8fe8d5ec8c
|
|
@ -187,10 +187,10 @@ class Trace( BaseModel ):
|
||||||
func = Task.registered_tasks[self.task.name]
|
func = Task.registered_tasks[self.task.name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.set_status( "E", "Script Missing!" )
|
self.set_status( "E", "Script Missing!" )
|
||||||
|
await self.asave()
|
||||||
return
|
return
|
||||||
else:
|
|
||||||
self.set_status( "R" )
|
self.set_status( "R" )
|
||||||
finally:
|
|
||||||
await self.asave()
|
await self.asave()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -171,10 +171,22 @@ class AsyncronWorker:
|
||||||
def model_changed( self, signal_name, sender, signal, instance, **kwargs ):
|
def model_changed( self, signal_name, sender, signal, instance, **kwargs ):
|
||||||
from .models import Task
|
from .models import Task
|
||||||
for name in self.watching_models[instance.__class__]:
|
for name in self.watching_models[instance.__class__]:
|
||||||
|
task = Task.registered_tasks[name].task
|
||||||
|
|
||||||
|
if task.worker_type not in ("AR" if self.model.is_robust else "AD"): #If we can't run this trace
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
Task.registered_tasks[name].task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ),
|
task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ),
|
||||||
self.loop
|
self.loop
|
||||||
)
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
asyncio.run_coroutine_threadsafe(
|
||||||
|
self.start_task_now( task ),
|
||||||
|
self.loop
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def graceful_shutdown( self ):
|
async def graceful_shutdown( self ):
|
||||||
try:
|
try:
|
||||||
|
|
@ -335,29 +347,6 @@ class AsyncronWorker:
|
||||||
|
|
||||||
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
||||||
|
|
||||||
async def start_trace_on_time( self, trace ):
|
|
||||||
from .models import Trace
|
|
||||||
|
|
||||||
if trace.task.timeout: #If this is from a periodic task
|
|
||||||
await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() )
|
|
||||||
await trace.arefresh_from_db()
|
|
||||||
|
|
||||||
await trace.start()
|
|
||||||
trace.worker_lock = None
|
|
||||||
await trace.asave( update_fields = ['worker_lock'] )
|
|
||||||
|
|
||||||
#Traces for the same task that we are done with (Completed, Aborted, Errored)
|
|
||||||
QuerySet = Trace.objects.filter(
|
|
||||||
task_id = trace.task_id, status__in = "CAE", protected = False, worker_lock = None
|
|
||||||
).order_by('-register_datetime')
|
|
||||||
|
|
||||||
#Should be deleted after the threashold
|
|
||||||
max_count = trace.task.max_completed_traces if trace.status == "C" else trace.task.max_failed_traces
|
|
||||||
await QuerySet.exclude(
|
|
||||||
id__in = QuerySet[:max_count].values_list( 'id', flat = True )
|
|
||||||
).adelete()
|
|
||||||
|
|
||||||
|
|
||||||
async def check_services( self ):
|
async def check_services( self ):
|
||||||
from .models import Task, Trace
|
from .models import Task, Trace
|
||||||
|
|
||||||
|
|
@ -382,3 +371,33 @@ class AsyncronWorker:
|
||||||
#self.running_service_tasks[task.id] =
|
#self.running_service_tasks[task.id] =
|
||||||
self.loop.create_task( self.start_trace_on_time( trace ) )
|
self.loop.create_task( self.start_trace_on_time( trace ) )
|
||||||
await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )
|
await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )
|
||||||
|
|
||||||
|
async def start_task_now( self, task ):
|
||||||
|
trace = task.new_trace()
|
||||||
|
trace.set_status( "S", f"Change ({signal_name}) on {instance}" )
|
||||||
|
trace.scheduled_datetime = timezone.now() #So it runs instantly
|
||||||
|
trace.worker_lock_id = self.model.id
|
||||||
|
|
||||||
|
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
||||||
|
|
||||||
|
async def start_trace_on_time( self, trace ):
|
||||||
|
from .models import Trace
|
||||||
|
|
||||||
|
if trace.scheduled_datetime and timezone.now() < trace.scheduled_datetime: #If this is a periodic task
|
||||||
|
await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() )
|
||||||
|
await trace.arefresh_from_db()
|
||||||
|
|
||||||
|
await trace.start()
|
||||||
|
trace.worker_lock = None
|
||||||
|
await trace.asave( update_fields = ['worker_lock'] )
|
||||||
|
|
||||||
|
#Traces for the same task that we are done with (Completed, Aborted, Errored)
|
||||||
|
QuerySet = Trace.objects.filter(
|
||||||
|
task_id = trace.task_id, status__in = "CAE", protected = False, worker_lock = None
|
||||||
|
).order_by('-register_datetime')
|
||||||
|
|
||||||
|
#Should be deleted after the threashold
|
||||||
|
max_count = trace.task.max_completed_traces if trace.status == "C" else trace.task.max_failed_traces
|
||||||
|
await QuerySet.exclude(
|
||||||
|
id__in = QuerySet[:max_count].values_list( 'id', flat = True )
|
||||||
|
).adelete()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user