|
|
|
|
@@ -16,10 +16,9 @@ import collections, functools
|
|
|
|
|
import random
|
|
|
|
|
|
|
|
|
|
from .utils import retry_on_db_error, ignore_on_db_error
|
|
|
|
|
from .asynctools import AsyncOriented
|
|
|
|
|
|
|
|
|
|
ASYNC_TASKS_CLEANUP_THRESHOLD = 64
|
|
|
|
|
|
|
|
|
|
class AsyncronWorker:
|
|
|
|
|
class AsyncronWorker( AsyncOriented ):
|
|
|
|
|
"""
|
|
|
|
|
The asyncron worker for a process, limited to one per proccess.
|
|
|
|
|
Other threads on the same process, can offload their work to this thread and it's single async event loop.
|
|
|
|
|
@@ -30,7 +29,7 @@ class AsyncronWorker:
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
IS_ACTIVE = False #Wheter this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run!
|
|
|
|
|
IS_ACTIVE = False #Whether this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run!
|
|
|
|
|
INSTANCE = None #Singelton
|
|
|
|
|
INIT_CALLBACKS = [] #Once the singelton is created, it'll run through the callbacks with itself as the first arg.
|
|
|
|
|
|
|
|
|
|
@@ -44,6 +43,10 @@ class AsyncronWorker:
|
|
|
|
|
signal.SIGTERM
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
LOG_NAME = "asyncron.worker"
|
|
|
|
|
LOG_FMT = r"%(asctime)s [%(process)d.Asyncron] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]"
|
|
|
|
|
LOG_LEVEL = logging.DEBUG
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def register_init_callback( cls, callback ):
|
|
|
|
|
assert not cls.INSTANCE, "Cannot register new callbacks after the worker is created!"
|
|
|
|
|
@@ -66,21 +69,7 @@ class AsyncronWorker:
|
|
|
|
|
cls.INSTANCE.log.debug("Worker created for this process.")
|
|
|
|
|
return cls.INSTANCE
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def log( self ):
|
|
|
|
|
if hasattr( self, '_log' ): return self._log
|
|
|
|
|
log = logging.getLogger("asyncron.worker")
|
|
|
|
|
handler = logging.StreamHandler()
|
|
|
|
|
#Taken from gunicorn, so it looks similar in the merged output
|
|
|
|
|
formatter = logging.Formatter(
|
|
|
|
|
r"%(asctime)s [%(process)d] [%(levelname)s] [Asyncron] %(message)s",
|
|
|
|
|
r"[%Y-%m-%d %H:%M:%S %z]"
|
|
|
|
|
)
|
|
|
|
|
handler.setFormatter(formatter)
|
|
|
|
|
log.addHandler(handler)
|
|
|
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
|
self._log = log
|
|
|
|
|
return log
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def register_with_exit_signals( self ):
|
|
|
|
|
"""
|
|
|
|
|
@@ -116,12 +105,11 @@ class AsyncronWorker:
|
|
|
|
|
def handle_new_db_connection( self, sender, **kwargs ):
|
|
|
|
|
if self.is_db_ready: return
|
|
|
|
|
self.log.debug(f"First DB connection: {sender}")
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
|
|
|
|
|
self.is_db_ready = True
|
|
|
|
|
django_signals.connection_created.disconnect( self.handle_new_db_connection )
|
|
|
|
|
|
|
|
|
|
if not self.loop: return
|
|
|
|
|
if not self.loop: return #We're not inside an async runner in this(?) or another thread.
|
|
|
|
|
self.loop.call_soon_threadsafe( self.is_db_ready_event.set )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -141,7 +129,7 @@ class AsyncronWorker:
|
|
|
|
|
def stop( self, reason = None ):
|
|
|
|
|
if self.is_stopping: return #TODO: Insisting on exiting faster should probably be managed in the signal handler
|
|
|
|
|
|
|
|
|
|
self.log.info(f"Stopping Worker: {reason}")
|
|
|
|
|
self.log.info( f"Stopping Worker: {reason}" )
|
|
|
|
|
self.is_stopping = True
|
|
|
|
|
|
|
|
|
|
if not self.loop: return
|
|
|
|
|
@@ -157,10 +145,6 @@ class AsyncronWorker:
|
|
|
|
|
|
|
|
|
|
def __init__( self ):
|
|
|
|
|
|
|
|
|
|
#Used only inside create_task method
|
|
|
|
|
self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD
|
|
|
|
|
self.tasks_running = []
|
|
|
|
|
|
|
|
|
|
#These booleans have asyncio.Event counterparts in the loop context
|
|
|
|
|
self.is_db_ready = False
|
|
|
|
|
self.is_stopping = False
|
|
|
|
|
@@ -170,17 +154,14 @@ class AsyncronWorker:
|
|
|
|
|
|
|
|
|
|
#Parallelism
|
|
|
|
|
self.thread = None #Once the worker starts, it'll be populated
|
|
|
|
|
self.loop = None #Once the event loop is ready, it'll be populated
|
|
|
|
|
|
|
|
|
|
self.clearing_dead_workers = False
|
|
|
|
|
self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks
|
|
|
|
|
|
|
|
|
|
self.database_unreachable = False
|
|
|
|
|
|
|
|
|
|
def event_loop_init( self ):
|
|
|
|
|
assert not self.loop, "This worker already has a running even loop!"
|
|
|
|
|
self.loop = asyncio.new_event_loop()
|
|
|
|
|
asyncio.set_event_loop( self.loop )
|
|
|
|
|
async def startup( self ):
|
|
|
|
|
await super().startup()
|
|
|
|
|
|
|
|
|
|
#These asyncio.Events have booleans counterparts in the __init__ section for main thread non async logic
|
|
|
|
|
self.is_db_ready_event = asyncio.Event()
|
|
|
|
|
@@ -188,84 +169,66 @@ class AsyncronWorker:
|
|
|
|
|
|
|
|
|
|
self.task_reason_jobs_queue = asyncio.Queue() #Run tasks from other threads, safely
|
|
|
|
|
|
|
|
|
|
def create_task( self, coro, *, silent = False, name = None, context = None ):
|
|
|
|
|
|
|
|
|
|
if not silent:
|
|
|
|
|
async def wrapped():
|
|
|
|
|
try:
|
|
|
|
|
return await wrapped.coro
|
|
|
|
|
except KeyboardInterrupt as e:
|
|
|
|
|
self.log.debug(f"Task {wrapped.coro} {name} Interrupted with {e}." )
|
|
|
|
|
raise
|
|
|
|
|
except:
|
|
|
|
|
self.log.warning(f"Task Error {wrapped.coro} {name}:\n{traceback.format_exc()}" )
|
|
|
|
|
raise
|
|
|
|
|
wrapped.coro = coro
|
|
|
|
|
coro = wrapped
|
|
|
|
|
|
|
|
|
|
task = self.loop.create_task( coro(), name = name, context = context )
|
|
|
|
|
self.tasks_running.append( task )
|
|
|
|
|
|
|
|
|
|
if self.tasks_next_cleanup < len(self.tasks_running):
|
|
|
|
|
task_count = len(self.tasks_running)
|
|
|
|
|
for t in list(self.tasks_running):
|
|
|
|
|
if t.done(): self.tasks_running.remove(t)
|
|
|
|
|
self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD + len(self.tasks_running)
|
|
|
|
|
self.log.debug(
|
|
|
|
|
f"Cleaned up {task_count - len(self.tasks_running)} of {len(self.tasks_running)} tasks,"
|
|
|
|
|
f" next cleanup at: {self.tasks_next_cleanup}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return task
|
|
|
|
|
|
|
|
|
|
def start_working( self, is_robust = False ):
|
|
|
|
|
self.event_loop_init()
|
|
|
|
|
with asyncio.Runner() as runner:
|
|
|
|
|
|
|
|
|
|
if self.start_after_db_ready:
|
|
|
|
|
self.log.debug("Waiting on another module to create the first database connection...")
|
|
|
|
|
self.loop.run_until_complete( self.is_db_ready_event.wait() )
|
|
|
|
|
runner.run( self.startup() )
|
|
|
|
|
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust )
|
|
|
|
|
if self.start_after_db_ready:
|
|
|
|
|
self.log.debug("Waiting on another module to create the first database connection...")
|
|
|
|
|
runner.run( self.is_db_ready_event.wait() )
|
|
|
|
|
|
|
|
|
|
self.create_task( self.consume_task_reason_jobs_queue() )
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust )
|
|
|
|
|
|
|
|
|
|
self.create_task( self.consume_task_reason_jobs_queue() )
|
|
|
|
|
|
|
|
|
|
self.create_task( self.master_main() )
|
|
|
|
|
self.create_task( self.work_loop() )
|
|
|
|
|
|
|
|
|
|
self.model.save() #likley Avoid's the django initialization warning, since waited for is_db_ready_event above!
|
|
|
|
|
self.model.refresh_from_db()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#Fight over who's gonna be the master, prove your health in the process!
|
|
|
|
|
self.create_task( retry_on_db_error(self.master_loop)() )
|
|
|
|
|
main_task = self.create_task( self.work_loop() )
|
|
|
|
|
#Fill in the ID fields of the tasks we didn't dare to check with db until now
|
|
|
|
|
from .models import Task
|
|
|
|
|
for func in Task.registered_tasks.values():
|
|
|
|
|
task = func.task
|
|
|
|
|
if not task.pk:
|
|
|
|
|
try: task.pk = Task.objects.get( name = task.name ).pk
|
|
|
|
|
except Task.DoesNotExist: pass #It's a new one, it's fine.
|
|
|
|
|
|
|
|
|
|
#time.sleep(0.3) #To avoid the django initialization warning!
|
|
|
|
|
self.model.save()
|
|
|
|
|
self.model.refresh_from_db()
|
|
|
|
|
self.attach_django_signals()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
runner.run( self.is_stopping_event.wait() ) #This is the lifetime of this worker
|
|
|
|
|
|
|
|
|
|
#Fill in the ID fields of the tasks we didn't dare to check with db until now
|
|
|
|
|
from .models import Task
|
|
|
|
|
for func in Task.registered_tasks.values():
|
|
|
|
|
task = func.task
|
|
|
|
|
if not task.pk:
|
|
|
|
|
try: task.pk = Task.objects.get( name = task.name ).pk
|
|
|
|
|
except Task.DoesNotExist: pass #It's a new one, it's fine.
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
self.log.info(f"[W{self.model.id}] Worker Received KeyboardInterrupt, exiting...")
|
|
|
|
|
|
|
|
|
|
self.attach_django_signals()
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
self.log.info(f"[W{self.model.id}] Worker Stopped, exiting...")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.loop.run_until_complete( self.is_stopping_event.wait() ) #This is the lifetime of this worker
|
|
|
|
|
except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...")
|
|
|
|
|
except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...")
|
|
|
|
|
else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...")
|
|
|
|
|
else:
|
|
|
|
|
self.log.info(f"[W{self.model.id}] Worker exiting...")
|
|
|
|
|
|
|
|
|
|
self.loop.run_until_complete( self.graceful_shutdown() )
|
|
|
|
|
#We must catch sigterm/KeyboardInterrupt multiple times:
|
|
|
|
|
# 1 - Stop accepting new traces & let self_aware traces know they are in gracetime.
|
|
|
|
|
# 2 - abort all
|
|
|
|
|
runner.run( self.graceful_shutdown() )
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update(
|
|
|
|
|
status_reason = "Worker died during execution",
|
|
|
|
|
status = "A", worker_lock = None
|
|
|
|
|
)
|
|
|
|
|
except: pass
|
|
|
|
|
runner.run( self.cleanup() )
|
|
|
|
|
|
|
|
|
|
#if count: print(f"Had to cancel {count} task(s).") #cls.log.warning
|
|
|
|
|
try:
|
|
|
|
|
count = Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update(
|
|
|
|
|
status_reason = "Worker died during execution",
|
|
|
|
|
status = "A", worker_lock = None
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.log.info(f"Marking traces as 'Aborted' Failed: {e}")
|
|
|
|
|
else:
|
|
|
|
|
if count: self.log.info(f"Marked {count} trace(s) as 'Aborted'.")
|
|
|
|
|
|
|
|
|
|
# Almost always, a worker can delete it's model from the db,
|
|
|
|
|
# But it seems that there is a sitation where despite Task.worker_lock being delete=SET_NULL,
|
|
|
|
|
@@ -277,12 +240,13 @@ class AsyncronWorker:
|
|
|
|
|
for attempt in range(3):
|
|
|
|
|
try:
|
|
|
|
|
self.model.delete()
|
|
|
|
|
except IntegrityError:
|
|
|
|
|
except IntegrityError as e:
|
|
|
|
|
self.log.warning(f"Deleting worker (self) failed {attempt+1} time(s): {e}")
|
|
|
|
|
time.sleep( 0.1 )
|
|
|
|
|
else: break
|
|
|
|
|
|
|
|
|
|
self.log.debug("Worker stopped working.")
|
|
|
|
|
#self.loop.call_soon(self.started.set)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def attach_django_signals( self ):
|
|
|
|
|
django_name_to_signals = {
|
|
|
|
|
@@ -319,12 +283,13 @@ class AsyncronWorker:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
#print( threading.current_thread(), signal_name, sender, signal, instance, kwargs )
|
|
|
|
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
|
|
|
self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ),
|
|
|
|
|
self.loop
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
|
|
|
self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ),
|
|
|
|
|
self.loop
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.log.warning(f"Model {task} Threadsafe execution Change ({signal_name}) on {instance} failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -334,34 +299,43 @@ class AsyncronWorker:
|
|
|
|
|
if not await self.model.trace_set.aexists():
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep( attempt / 10 )
|
|
|
|
|
else: self.log.info(f"[Asyncron][W{self.model.id}] Graceful shutdown not graceful enough!")
|
|
|
|
|
else: self.log.info(f"[W{self.model.id}] Graceful shutdown not graceful enough!")
|
|
|
|
|
except: await asyncio.sleep( 1 )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def master_loop( self ):
|
|
|
|
|
async def master_main( self ):
|
|
|
|
|
"""
|
|
|
|
|
Fight over who's gonna be the master.
|
|
|
|
|
Prove your health in the process!
|
|
|
|
|
"""
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
|
|
|
|
|
#Delete dead masters every now and then!
|
|
|
|
|
last_overtake_attempt = 0
|
|
|
|
|
current_master = False
|
|
|
|
|
is_current_master = False
|
|
|
|
|
next_overtake_attempt = time.time() + 1 + random.random() * 5
|
|
|
|
|
loop_wait = 5
|
|
|
|
|
|
|
|
|
|
MyQs = Worker.objects.filter( id = self.model.id )
|
|
|
|
|
while await MyQs.aupdate( last_activity = timezone.now() ):
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
await Worker.objects.filter( is_master = False ).aupdate( is_master = models.Q(id = self.model.id) )
|
|
|
|
|
|
|
|
|
|
except RuntimeError as e: #Syntax Error cause: cannot schedule new futures after interpreter shutdown
|
|
|
|
|
self.log.critical(f"[W{self.model.id}] Master Loop Runtime Error:", e )
|
|
|
|
|
if "interpreter shutdown" not in e.args[0]: raise
|
|
|
|
|
self.database_unreachable = True
|
|
|
|
|
loop_wait = 0
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
except IntegrityError: # I'm not master!
|
|
|
|
|
loop_wait = 5 + random.random() * 15
|
|
|
|
|
|
|
|
|
|
if current_master: self.log.info(f"[Asyncron][W{self.model.id}] No longer master.")
|
|
|
|
|
current_master = False
|
|
|
|
|
if is_current_master: self.log.info(f"[W{self.model.id}] No longer master.")
|
|
|
|
|
is_current_master = False
|
|
|
|
|
|
|
|
|
|
if last_overtake_attempt + 60 < time.time():
|
|
|
|
|
last_overtake_attempt = time.time()
|
|
|
|
|
#Deletes dead masters every now and then!
|
|
|
|
|
if next_overtake_attempt <= time.time():
|
|
|
|
|
next_overtake_attempt = time.time() + 60
|
|
|
|
|
took_master = False
|
|
|
|
|
|
|
|
|
|
if self.model.is_robust:
|
|
|
|
|
@@ -370,15 +344,16 @@ class AsyncronWorker:
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
await Worker.objects.filter( is_master = True ).filter(
|
|
|
|
|
models.Q(last_crowning_attempt = None) | models.Q(last_crowning_attempt__lte = timezone.now() - timezone.timedelta( minutes = 5 ))
|
|
|
|
|
models.Q( last_activity = None ) |
|
|
|
|
|
models.Q( last_activity__lte = timezone.now() - timezone.timedelta( minutes = 2 ) )
|
|
|
|
|
).aupdate( is_master = False )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: #I am Master!
|
|
|
|
|
loop_wait = 2 + random.random() * 3
|
|
|
|
|
|
|
|
|
|
if not current_master: self.log.info(f"[Asyncron][W{self.model.id}] Running as master.")
|
|
|
|
|
current_master = True
|
|
|
|
|
if not is_current_master: self.log.info(f"[W{self.model.id}] Running as master.")
|
|
|
|
|
is_current_master = True
|
|
|
|
|
|
|
|
|
|
if not self.clearing_dead_workers:
|
|
|
|
|
self.create_task( self.clear_dead_workers() )
|
|
|
|
|
@@ -386,10 +361,11 @@ class AsyncronWorker:
|
|
|
|
|
await self.sync_tasks()
|
|
|
|
|
await self.clear_orphaned_traces()
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
if not self.database_unreachable:
|
|
|
|
|
await Worker.objects.filter( id = self.model.id ).aupdate( last_crowning_attempt = timezone.now() )
|
|
|
|
|
await asyncio.sleep( loop_wait )
|
|
|
|
|
if loop_wait:
|
|
|
|
|
await asyncio.sleep( loop_wait )
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
self.log.warning(f"[W{self.model.id}] Worker in Master Loop Cannot find it's corresponding database model!")
|
|
|
|
|
|
|
|
|
|
async def clear_orphaned_traces( self ):
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
@@ -399,11 +375,11 @@ class AsyncronWorker:
|
|
|
|
|
self.clearing_dead_workers = True
|
|
|
|
|
from .models import Worker, Task, Trace
|
|
|
|
|
await Worker.objects.filter(
|
|
|
|
|
last_crowning_attempt__lte = timezone.now() - timezone.timedelta( seconds = 30 ),
|
|
|
|
|
last_activity__lte = timezone.now() - timezone.timedelta( seconds = 30 ),
|
|
|
|
|
in_grace = False
|
|
|
|
|
).aupdate( in_grace = True )
|
|
|
|
|
|
|
|
|
|
async for worker in Worker.objects.filter( in_grace = False, last_crowning_attempt = None ):
|
|
|
|
|
async for worker in Worker.objects.filter( in_grace = False, last_activity = None ):
|
|
|
|
|
if not await sync_to_async( worker.is_proc_alive )():
|
|
|
|
|
await worker.adelete()
|
|
|
|
|
|
|
|
|
|
|