Lots of changes, mostly about model_change signals
This commit is contained in:
parent
de2885d20d
commit
243c43f38e
|
|
@ -178,10 +178,6 @@ class Trace( BaseModel ):
|
||||||
self.returned = None
|
self.returned = None
|
||||||
self.stderr = ""
|
self.stderr = ""
|
||||||
self.stdout = ""
|
self.stdout = ""
|
||||||
#Runtime Bits
|
|
||||||
self.loop = asyncio.get_running_loop()
|
|
||||||
self.new_print = asyncio.Event()
|
|
||||||
self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() )
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
func = Task.registered_tasks[self.task.name]
|
func = Task.registered_tasks[self.task.name]
|
||||||
|
|
@ -193,6 +189,11 @@ class Trace( BaseModel ):
|
||||||
self.set_status( "R" )
|
self.set_status( "R" )
|
||||||
await self.asave()
|
await self.asave()
|
||||||
|
|
||||||
|
#Runtime Bits
|
||||||
|
self.loop = asyncio.get_running_loop()
|
||||||
|
self.new_print = asyncio.Event()
|
||||||
|
self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() )
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with asyncio.timeout( None ) as tmcm:
|
async with asyncio.timeout( None ) as tmcm:
|
||||||
|
|
||||||
|
|
@ -217,7 +218,9 @@ class Trace( BaseModel ):
|
||||||
self.set_status( "C" )
|
self.set_status( "C" )
|
||||||
self.returned = output
|
self.returned = output
|
||||||
|
|
||||||
|
finally:
|
||||||
self.commit_on_new_print_task.cancel()
|
self.commit_on_new_print_task.cancel()
|
||||||
|
|
||||||
self.last_end_datetime = timezone.now()
|
self.last_end_datetime = timezone.now()
|
||||||
await self.asave()
|
await self.asave()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -116,6 +116,10 @@ class AsyncronWorker:
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop( self.loop )
|
asyncio.set_event_loop( self.loop )
|
||||||
|
|
||||||
|
#Run tasks from other threads, safely
|
||||||
|
self.task_reason_jobs_queue = asyncio.Queue()
|
||||||
|
self.create_task( self.consume_task_reason_jobs_queue() )
|
||||||
|
|
||||||
#Fight over who's gonna be the master, prove your health in the process!
|
#Fight over who's gonna be the master, prove your health in the process!
|
||||||
self.create_task( retry_on_db_error(self.master_loop)() )
|
self.create_task( retry_on_db_error(self.master_loop)() )
|
||||||
main_task = self.create_task( self.work_loop() )
|
main_task = self.create_task( self.work_loop() )
|
||||||
|
|
@ -190,6 +194,10 @@ class AsyncronWorker:
|
||||||
for name in self.watching_models[instance.__class__]:
|
for name in self.watching_models[instance.__class__]:
|
||||||
task = Task.registered_tasks[name].task
|
task = Task.registered_tasks[name].task
|
||||||
|
|
||||||
|
if task.trace_set.filter( status = "R" ).exists():
|
||||||
|
#print("Will not run another trace of the same task to reduce the change of an infinite cycle.")
|
||||||
|
continue
|
||||||
|
|
||||||
if task.worker_type not in ("AR" if self.model.is_robust else "AD"): #If we can't run this trace
|
if task.worker_type not in ("AR" if self.model.is_robust else "AD"): #If we can't run this trace
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ),
|
task.ensure_quick_execution( reason = f"Change ({signal_name}) on {instance}" ),
|
||||||
|
|
@ -197,8 +205,10 @@ class AsyncronWorker:
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
#print( threading.current_thread(), signal_name, sender, signal, instance, kwargs )
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
self.start_task_now( task, reason = f"Change ({signal_name}) on {instance}" ),
|
self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ),
|
||||||
self.loop
|
self.loop
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -345,6 +355,11 @@ class AsyncronWorker:
|
||||||
|
|
||||||
self.work_loop_over.set()
|
self.work_loop_over.set()
|
||||||
|
|
||||||
|
async def consume_task_reason_jobs_queue( self ):
|
||||||
|
while True:
|
||||||
|
task, reason = await self.task_reason_jobs_queue.get()
|
||||||
|
await self.start_task_now( task, reason )
|
||||||
|
self.task_reason_jobs_queue.task_done()
|
||||||
|
|
||||||
async def check_scheduled( self ):
|
async def check_scheduled( self ):
|
||||||
from .models import Task, Trace
|
from .models import Task, Trace
|
||||||
|
|
@ -376,7 +391,7 @@ class AsyncronWorker:
|
||||||
except IntegrityError: count = 0
|
except IntegrityError: count = 0
|
||||||
if not count: continue #Lost the race condition to another worker.
|
if not count: continue #Lost the race condition to another worker.
|
||||||
|
|
||||||
self.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
self.create_task( self.start_trace_on_time( trace ) )
|
||||||
|
|
||||||
async def check_services( self ):
|
async def check_services( self ):
|
||||||
from .models import Task, Trace
|
from .models import Task, Trace
|
||||||
|
|
@ -397,7 +412,7 @@ class AsyncronWorker:
|
||||||
if not locked: continue
|
if not locked: continue
|
||||||
|
|
||||||
trace = task.new_trace()
|
trace = task.new_trace()
|
||||||
trace.status = "W"
|
trace.set_status( "W", "Waiting to start the service ASAP." )
|
||||||
trace.worker_lock = self.model
|
trace.worker_lock = self.model
|
||||||
await trace.asave()
|
await trace.asave()
|
||||||
|
|
||||||
|
|
@ -421,6 +436,7 @@ class AsyncronWorker:
|
||||||
await trace.arefresh_from_db()
|
await trace.arefresh_from_db()
|
||||||
|
|
||||||
await trace.start()
|
await trace.start()
|
||||||
|
|
||||||
trace.worker_lock = None
|
trace.worker_lock = None
|
||||||
await trace.asave( update_fields = ['worker_lock'] )
|
await trace.asave( update_fields = ['worker_lock'] )
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user