diff --git a/asyncron/admin.py b/asyncron/admin.py index c96a1b4..3aeac80 100644 --- a/asyncron/admin.py +++ b/asyncron/admin.py @@ -34,15 +34,36 @@ class WorkerAdmin( BaseModelAdmin ): is_running.boolean = True def health( self, obj ): - return (f"In Grace " if obj.in_grace else "") + humanize.naturaltime( obj.last_crowning_attempt ) + return (f"In Grace " if obj.in_grace else "") + humanize.naturaltime( obj.last_activity ) + + +class TaskAppFilter( admin.SimpleListFilter ): #TODO: Also check if it's an actuall app, same for trace + title = "app" + parameter_name = 'app_groups' + + def lookups( self, request, model_admin ): + return ( + ( a.lower(), a ) + for a in sorted({ + task.name.split(".", 1)[0] + for task in Task.objects.all() + }) if a + ) + + def queryset( self, request, queryset ): + q = self.value() + if not q: return queryset + return queryset.filter( name__istartswith = q ) + @admin.register( Task ) class TaskAdmin( BaseModelAdmin ): order = 1 list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged', 'last_execution', 'scheduled' + list_filter = TaskAppFilter, fields = ["name", "description", "type", "on_model_change", "jitter", "self_aware"] - actions = 'schedule_execution', 'execution_now', 'delete_script_missing' + actions = 'schedule_execution', 'execution_now', 'delete_script_missing', 'update_details', def has_add_permission( self, request, obj = None ): return False def has_delete_permission( self, request, obj = None ): return False def has_change_permission( self, request, obj = None ): return False @@ -126,6 +147,12 @@ class TaskAdmin( BaseModelAdmin ): if task.name not in task.registered_tasks: task.delete() + @admin.action( description = "Check for updates in details of this tasks" ) + def update_details( self, request, qs ): + for task in qs: + if task.name not in task.registered_tasks: continue + task_f = task.registered_tasks[task.name] + print( "T:", task, task_f ) class TraceAppFilter(admin.SimpleListFilter): title = "app" diff --git a/asyncron/apps.py b/asyncron/apps.py index 76ae4e6..9e3755c 100644 --- a/asyncron/apps.py +++ b/asyncron/apps.py @@ -99,7 +99,9 @@ class AsyncronConfig(AppConfig): for field_name, field_kwarg_value in field_nv.items(): if not hasattr(model, field_name): continue - field = getattr(model, field_name).field + try: field = getattr(model, field_name).field + except AttributeError: continue + if not getattr(field, field_kwarg, None): #Set only as default or empty replacements setattr(field, field_kwarg, field_kwarg_value) diff --git a/asyncron/gunicorn.py b/asyncron/gunicorn.py index e2e87e7..9782443 100644 --- a/asyncron/gunicorn.py +++ b/asyncron/gunicorn.py @@ -11,7 +11,14 @@ def post_fork( server, worker ): #worker and AsyncronWorker, pay attention! post_fork.server = server post_fork.worker = worker - from .workers import AsyncronWorker + #Temporary so I can work on this. + import time + for attempt in range(3): + try: from .workers import AsyncronWorker + except ImportError: time.sleep(0.1) + else: break + else: raise ImportError() + AsyncronWorker.IS_ACTIVE = True AsyncronWorker.register_init_callback( _patch ) diff --git a/asyncron/migrations/0004_alter_task_gracetime.py b/asyncron/migrations/0004_alter_task_gracetime.py new file mode 100644 index 0000000..98fad57 --- /dev/null +++ b/asyncron/migrations/0004_alter_task_gracetime.py @@ -0,0 +1,19 @@ +# Generated by Django 5.2.5 on 2025-11-22 16:54 + +import datetime +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('asyncron', '0003_alter_task_self_aware'), + ] + + operations = [ + migrations.AlterField( + model_name='task', + name='gracetime', + field=models.DurationField(default=datetime.timedelta(seconds=1)), + ), + ] diff --git a/asyncron/migrations/0005_rename_last_crowning_attempt_worker_last_activity.py b/asyncron/migrations/0005_rename_last_crowning_attempt_worker_last_activity.py new file mode 100644 index 0000000..aa9d273 --- /dev/null +++ b/asyncron/migrations/0005_rename_last_crowning_attempt_worker_last_activity.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.5 on 2025-11-22 19:09 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('asyncron', '0004_alter_task_gracetime'), + ] + + operations = [ + migrations.RenameField( + model_name='worker', + old_name='last_crowning_attempt', + new_name='last_activity', + ), + ] diff --git a/asyncron/models.py b/asyncron/models.py index 2683903..ff883da 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -16,14 +16,17 @@ import asyncio # Create your models here. from .base.models import BaseModel class Worker( BaseModel ): + pid = models.IntegerField() thread_id = models.PositiveBigIntegerField() + is_robust = models.BooleanField( default = False ) is_master = models.BooleanField( default = False ) in_grace = models.BooleanField( default = False ) #If the worker sees this as True, it should kill itself! + last_activity = models.DateTimeField( null = True, blank = True ) + #Variables with very feel good names! :) - last_crowning_attempt = models.DateTimeField( null = True, blank = True ) consumption_interval_seconds = models.IntegerField( default = 10 ) consumption_total_active = models.IntegerField( default = 0 ) def __str__( self ): return f"P{self.pid}W{self.thread_id}" + ("R" if self.is_robust else "D") @@ -43,7 +46,9 @@ class Worker( BaseModel ): else: return True # no error, we can send a signal to the process class Task( BaseModel ): - registered_tasks = {} #Name -> self + + registered_tasks = {} #Name -> the actual function + name = models.TextField( unique = True ) #Path to the function worker_lock = models.ForeignKey( Worker, null = True, blank = True, on_delete = models.SET_NULL ) worker_type = models.CharField( default = "A", choices = { @@ -59,7 +64,7 @@ class Task( BaseModel ): default = timezone.timedelta( minutes = 5 ), null = True, blank = True ) #None will mean it's a "service" like task - gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) ) + gracetime = models.DurationField( default = timezone.timedelta( seconds = 1 ) ) self_aware = models.BooleanField( default = True, help_text = "Whether It's first argument is 'self', being a trace instance." ) #Periodic Tasks diff --git a/asyncron/workers.py b/asyncron/workers.py index c62a4f6..3143516 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -16,10 +16,9 @@ import collections, functools import random from .utils import retry_on_db_error, ignore_on_db_error +from .asynctools import AsyncOriented -ASYNC_TASKS_CLEANUP_THRESHOLD = 64 - -class AsyncronWorker: +class AsyncronWorker( AsyncOriented ): """ The asyncron worker for a process, limited to one per proccess. Other threads on the same process, can offload their work to this thread and it's single async event loop. @@ -30,7 +29,7 @@ class AsyncronWorker: """ - IS_ACTIVE = False #Wheter this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run! + IS_ACTIVE = False #Whether this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run! INSTANCE = None #Singelton INIT_CALLBACKS = [] #Once the singelton is created, it'll run through the callbacks with itself as the first arg. @@ -44,6 +43,10 @@ class AsyncronWorker: signal.SIGTERM ] + LOG_NAME = "asyncron.worker" + LOG_FMT = r"%(asctime)s [%(process)d.Asyncron] [%(levelname)s] %(message)s", r"[%Y-%m-%d %H:%M:%S %z]" + LOG_LEVEL = logging.DEBUG + @classmethod def register_init_callback( cls, callback ): assert not cls.INSTANCE, "Cannot register new callbacks after the worker is created!" @@ -66,21 +69,7 @@ class AsyncronWorker: cls.INSTANCE.log.debug("Worker created for this process.") return cls.INSTANCE - @property - def log( self ): - if hasattr( self, '_log' ): return self._log - log = logging.getLogger("asyncron.worker") - handler = logging.StreamHandler() - #Taken from gunicorn, so it looks similar in the merged output - formatter = logging.Formatter( - r"%(asctime)s [%(process)d] [%(levelname)s] [Asyncron] %(message)s", - r"[%Y-%m-%d %H:%M:%S %z]" - ) - handler.setFormatter(formatter) - log.addHandler(handler) - log.setLevel(logging.DEBUG) - self._log = log - return log + def register_with_exit_signals( self ): """ @@ -116,12 +105,11 @@ class AsyncronWorker: def handle_new_db_connection( self, sender, **kwargs ): if self.is_db_ready: return self.log.debug(f"First DB connection: {sender}") - from .models import Worker, Task, Trace self.is_db_ready = True django_signals.connection_created.disconnect( self.handle_new_db_connection ) - if not self.loop: return + if not self.loop: return #We're not inside an async runner in this(?) or another thread. self.loop.call_soon_threadsafe( self.is_db_ready_event.set ) @@ -141,7 +129,7 @@ class AsyncronWorker: def stop( self, reason = None ): if self.is_stopping: return #TODO: Insisting on exiting faster should probably be managed in the signal handler - self.log.info(f"Stopping Worker: {reason}") + self.log.info( f"Stopping Worker: {reason}" ) self.is_stopping = True if not self.loop: return @@ -157,10 +145,6 @@ class AsyncronWorker: def __init__( self ): - #Used only inside create_task method - self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD - self.tasks_running = [] - #These booleans have asyncio.Event counterparts in the loop context self.is_db_ready = False self.is_stopping = False @@ -170,17 +154,14 @@ class AsyncronWorker: #Parallelism self.thread = None #Once the worker starts, it'll be populated - self.loop = None #Once the event loop is ready, it'll be populated self.clearing_dead_workers = False self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks self.database_unreachable = False - def event_loop_init( self ): - assert not self.loop, "This worker already has a running even loop!" - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop( self.loop ) + async def startup( self ): + await super().startup() #These asyncio.Events have booleans counterparts in the __init__ section for main thread non async logic self.is_db_ready_event = asyncio.Event() @@ -188,84 +169,66 @@ class AsyncronWorker: self.task_reason_jobs_queue = asyncio.Queue() #Run tasks from other threads, safely - def create_task( self, coro, *, silent = False, name = None, context = None ): - - if not silent: - async def wrapped(): - try: - return await wrapped.coro - except KeyboardInterrupt as e: - self.log.debug(f"Task {wrapped.coro} {name} Interrupted with {e}." ) - raise - except: - self.log.warning(f"Task Error {wrapped.coro} {name}:\n{traceback.format_exc()}" ) - raise - wrapped.coro = coro - coro = wrapped - - task = self.loop.create_task( coro(), name = name, context = context ) - self.tasks_running.append( task ) - - if self.tasks_next_cleanup < len(self.tasks_running): - task_count = len(self.tasks_running) - for t in list(self.tasks_running): - if t.done(): self.tasks_running.remove(t) - self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD + len(self.tasks_running) - self.log.debug( - f"Cleaned up {task_count - len(self.tasks_running)} of {len(self.tasks_running)} tasks," - f" next cleanup at: {self.tasks_next_cleanup}" - ) - - return task def start_working( self, is_robust = False ): - self.event_loop_init() + with asyncio.Runner() as runner: - if self.start_after_db_ready: - self.log.debug("Waiting on another module to create the first database connection...") - self.loop.run_until_complete( self.is_db_ready_event.wait() ) + runner.run( self.startup() ) - from .models import Worker, Task, Trace - self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust ) + if self.start_after_db_ready: + self.log.debug("Waiting on another module to create the first database connection...") + runner.run( self.is_db_ready_event.wait() ) - self.create_task( self.consume_task_reason_jobs_queue() ) + from .models import Worker, Task, Trace + self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust ) + + self.create_task( self.consume_task_reason_jobs_queue() ) + + self.create_task( self.master_main() ) + self.create_task( self.work_loop() ) + + self.model.save() #likley Avoid's the django initialization warning, since waited for is_db_ready_event above! + self.model.refresh_from_db() - #Fight over who's gonna be the master, prove your health in the process! - self.create_task( retry_on_db_error(self.master_loop)() ) - main_task = self.create_task( self.work_loop() ) + #Fill in the ID fields of the tasks we didn't dare to check with db until now + from .models import Task + for func in Task.registered_tasks.values(): + task = func.task + if not task.pk: + try: task.pk = Task.objects.get( name = task.name ).pk + except Task.DoesNotExist: pass #It's a new one, it's fine. - #time.sleep(0.3) #To avoid the django initialization warning! - self.model.save() - self.model.refresh_from_db() + self.attach_django_signals() + try: + runner.run( self.is_stopping_event.wait() ) #This is the lifetime of this worker - #Fill in the ID fields of the tasks we didn't dare to check with db until now - from .models import Task - for func in Task.registered_tasks.values(): - task = func.task - if not task.pk: - try: task.pk = Task.objects.get( name = task.name ).pk - except Task.DoesNotExist: pass #It's a new one, it's fine. + except KeyboardInterrupt: + self.log.info(f"[W{self.model.id}] Worker Received KeyboardInterrupt, exiting...") - self.attach_django_signals() + except RuntimeError: + self.log.info(f"[W{self.model.id}] Worker Stopped, exiting...") - try: - self.loop.run_until_complete( self.is_stopping_event.wait() ) #This is the lifetime of this worker - except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...") - except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...") - else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...") + else: + self.log.info(f"[W{self.model.id}] Worker exiting...") - self.loop.run_until_complete( self.graceful_shutdown() ) + #We must catch sigterm/KeyboardInterrupt multiple times: + # 1 - Stop accepting new traces & let self_aware traces know they are in gracetime. + # 2 - abort all + runner.run( self.graceful_shutdown() ) - try: - Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update( - status_reason = "Worker died during execution", - status = "A", worker_lock = None - ) - except: pass + runner.run( self.cleanup() ) - #if count: print(f"Had to cancel {count} task(s).") #cls.log.warning + try: + count = Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update( + status_reason = "Worker died during execution", + status = "A", worker_lock = None + ) + except Exception as e: + self.log.info(f"Marking traces as 'Aborted' Failed: {e}") + else: + if count: self.log.info(f"Marked {count} trace(s) as 'Aborted'.") # Almost always, a worker can delete it's model from the db, # But it seems that there is a sitation where despite Task.worker_lock being delete=SET_NULL, @@ -277,12 +240,13 @@ class AsyncronWorker: for attempt in range(3): try: self.model.delete() - except IntegrityError: + except IntegrityError as e: + self.log.warning(f"Deleting worker (self) failed {attempt+1} time(s): {e}") time.sleep( 0.1 ) else: break self.log.debug("Worker stopped working.") - #self.loop.call_soon(self.started.set) + def attach_django_signals( self ): django_name_to_signals = { @@ -319,12 +283,13 @@ class AsyncronWorker: continue #print( threading.current_thread(), signal_name, sender, signal, instance, kwargs ) - - asyncio.run_coroutine_threadsafe( - self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ), - self.loop - ) - + try: + asyncio.run_coroutine_threadsafe( + self.task_reason_jobs_queue.put( (task, f"Change ({signal_name}) on {instance}") ), + self.loop + ) + except Exception as e: + self.log.warning(f"Model {task} Threadsafe execution Change ({signal_name}) on {instance} failed: {e}") @@ -334,34 +299,43 @@ class AsyncronWorker: if not await self.model.trace_set.aexists(): break await asyncio.sleep( attempt / 10 ) - else: self.log.info(f"[Asyncron][W{self.model.id}] Graceful shutdown not graceful enough!") + else: self.log.info(f"[W{self.model.id}] Graceful shutdown not graceful enough!") except: await asyncio.sleep( 1 ) - async def master_loop( self ): + async def master_main( self ): + """ + Fight over who's gonna be the master. + Prove your health in the process! + """ from .models import Worker, Task, Trace - #Delete dead masters every now and then! - last_overtake_attempt = 0 - current_master = False + is_current_master = False + next_overtake_attempt = time.time() + 1 + random.random() * 5 + loop_wait = 5 + MyQs = Worker.objects.filter( id = self.model.id ) + while await MyQs.aupdate( last_activity = timezone.now() ): - while True: try: await Worker.objects.filter( is_master = False ).aupdate( is_master = models.Q(id = self.model.id) ) + except RuntimeError as e: #Syntax Error cause: cannot schedule new futures after interpreter shutdown + self.log.critical(f"[W{self.model.id}] Master Loop Runtime Error:", e ) if "interpreter shutdown" not in e.args[0]: raise self.database_unreachable = True + loop_wait = 0 break except IntegrityError: # I'm not master! loop_wait = 5 + random.random() * 15 - if current_master: self.log.info(f"[Asyncron][W{self.model.id}] No longer master.") - current_master = False + if is_current_master: self.log.info(f"[W{self.model.id}] No longer master.") + is_current_master = False - if last_overtake_attempt + 60 < time.time(): - last_overtake_attempt = time.time() + #Deletes dead masters every now and then! + if next_overtake_attempt <= time.time(): + next_overtake_attempt = time.time() + 60 took_master = False if self.model.is_robust: @@ -370,15 +344,16 @@ class AsyncronWorker: else: await Worker.objects.filter( is_master = True ).filter( - models.Q(last_crowning_attempt = None) | models.Q(last_crowning_attempt__lte = timezone.now() - timezone.timedelta( minutes = 5 )) + models.Q( last_activity = None ) | + models.Q( last_activity__lte = timezone.now() - timezone.timedelta( minutes = 2 ) ) ).aupdate( is_master = False ) else: #I am Master! loop_wait = 2 + random.random() * 3 - if not current_master: self.log.info(f"[Asyncron][W{self.model.id}] Running as master.") - current_master = True + if not is_current_master: self.log.info(f"[W{self.model.id}] Running as master.") + is_current_master = True if not self.clearing_dead_workers: self.create_task( self.clear_dead_workers() ) @@ -386,10 +361,11 @@ class AsyncronWorker: await self.sync_tasks() await self.clear_orphaned_traces() - finally: - if not self.database_unreachable: - await Worker.objects.filter( id = self.model.id ).aupdate( last_crowning_attempt = timezone.now() ) - await asyncio.sleep( loop_wait ) + if loop_wait: + await asyncio.sleep( loop_wait ) + + else: + self.log.warning(f"[W{self.model.id}] Worker in Master Loop Cannot find it's corresponding database model!") async def clear_orphaned_traces( self ): from .models import Worker, Task, Trace @@ -399,11 +375,11 @@ class AsyncronWorker: self.clearing_dead_workers = True from .models import Worker, Task, Trace await Worker.objects.filter( - last_crowning_attempt__lte = timezone.now() - timezone.timedelta( seconds = 30 ), + last_activity__lte = timezone.now() - timezone.timedelta( seconds = 30 ), in_grace = False ).aupdate( in_grace = True ) - async for worker in Worker.objects.filter( in_grace = False, last_crowning_attempt = None ): + async for worker in Worker.objects.filter( in_grace = False, last_activity = None ): if not await sync_to_async( worker.is_proc_alive )(): await worker.adelete()