diff --git a/asyncron/admin.py b/asyncron/admin.py index 12112e8..df81315 100644 --- a/asyncron/admin.py +++ b/asyncron/admin.py @@ -40,9 +40,9 @@ class WorkerAdmin( BaseModelAdmin ): class TaskAdmin( BaseModelAdmin ): order = 1 list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged_executions', 'last_execution', 'scheduled' - fields = ["name", "description", "type", "jitter"] + fields = ["name", "description", "type", "on_model_change", "jitter"] - actions = 'schedule_execution', 'execution_now', + actions = 'schedule_execution', 'execution_now', 'delete_script_missing' def has_add_permission( self, request, obj = None ): return False def has_delete_permission( self, request, obj = None ): return False def has_change_permission( self, request, obj = None ): return False @@ -76,6 +76,10 @@ class TaskAdmin( BaseModelAdmin ): return ", ".join( results ) if results else "Callable" + def on_model_change( self, obj ): + try: return ", ".join( f"{m.__module__}.{m.__name__}" for m in obj.registered_tasks[obj.name].watching_models ) + except: return "N/A" + def logged_executions( self, obj ): return obj.trace_set.exclude( status = "S" ).count() @@ -112,6 +116,12 @@ class TaskAdmin( BaseModelAdmin ): results = asyncio.run( Trace.objects.filter( id__in = trace_ids ).gather_method( 'start' ) ) self.explain_gather_results( request, results, 5 ) + @admin.action( description = "Delete tasks with missing scripts" ) + def delete_script_missing( self, request, qs ): + for task in qs: + if task.name not in task.registered_tasks: + task.delete() + class TraceAppFilter(admin.SimpleListFilter): title = "app" diff --git a/asyncron/apps.py b/asyncron/apps.py index 7e6d06a..e6b2388 100644 --- a/asyncron/apps.py +++ b/asyncron/apps.py @@ -41,6 +41,8 @@ class AsyncronConfig(AppConfig): def load_extensions( self ): from .base.models import BaseModel from .extender import Extender + from .gunicorn import post_fork #To add them to auto-reload watch + for app in apps.get_app_configs(): app_dir = pathlib.Path(app.path) for model in app.get_models(): @@ -60,6 +62,10 @@ class AsyncronConfig(AppConfig): loader = importlib.machinery.SourceFileLoader( f"{app.name}.extensions.{model.__name__}.{import_file.stem}", str(import_file) ) loader.exec_module( types.ModuleType(loader.name) ) + if hasattr( post_fork, "worker" ): + try: post_fork.worker.reloader.add_extra_file( str(import_file) ) + except: pass + if extender: extender.attach( model ) diff --git a/asyncron/workers.py b/asyncron/workers.py index 4a9aa0d..c775900 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -1,5 +1,6 @@ from django.db import IntegrityError, models, close_old_connections from django.utils import timezone +from django.db.utils import OperationalError from asgiref.sync import sync_to_async import os, signal @@ -50,7 +51,7 @@ class AsyncronWorker: @classmethod def stop( cls, reason = None ): - cls.log.info(f"Stopping AsyncronWorker(s): {reason}") + cls.log.info(f"[Asyncron] Stopping Worker(s): {reason}") for worker in cls.INSTANCES: if worker.is_stopping: continue worker.is_stopping = True @@ -82,6 +83,8 @@ class AsyncronWorker: self.is_stopping = False self.clearing_dead_workers = False self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks + self.work_loop_over = asyncio.Event() + if daemon: self.thread = threading.Thread( target = self.start ) self.thread.start() @@ -97,7 +100,7 @@ class AsyncronWorker: #Fight over who's gonna be the master, prove your health in the process! self.loop.create_task( self.master_loop() ) - self.loop.create_task( self.work_loop() ) + main_task = self.loop.create_task( self.work_loop() ) time.sleep(0.3) #To avoid the django initialization warning! self.model.save() @@ -115,9 +118,10 @@ class AsyncronWorker: try: self.loop.run_forever() #This is the lifetime of this worker - except KeyboardInterrupt: - print("Received exit, exiting") + except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...") + else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...") + self.loop.run_until_complete( self.graceful_shutdown() ) count = Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update( status_reason = "Worker died during execution", @@ -154,6 +158,14 @@ class AsyncronWorker: self.loop ) + async def graceful_shutdown( self ): + try: + for attempt in range( 10 ): + if not await self.model.trace_set.aexists(): + break + await asyncio.sleep( attempt / 10 ) + else: self.log.info(f"[Asyncron][W{self.model.id}] Graceful shutdown not graceful enough!") + except: await asyncio.sleep( 1 ) async def master_loop( self ): @@ -249,15 +261,23 @@ class AsyncronWorker: try: await self.check_scheduled() await sync_to_async( close_old_connections )() + except OperationalError as e: + self.log.warning(f"[Asyncron] DB Connection Error: {e}") + print( traceback.format_exc() ) + break + except Exception as e: self.log.warning(f"[Asyncron] check_scheduled failed: {e}") print( traceback.format_exc() ) self.check_interval = 20 + self.work_loop_over.set() + async def check_scheduled( self ): from .models import Task, Trace + #Schedule traces that aren't yet set. Ts = Task.objects.exclude( interval = None ).exclude( trace__status = "S" ).exclude( worker_type = "D" if self.model.is_robust else "R" ) @@ -266,7 +286,11 @@ class AsyncronWorker: trace = task.new_trace() await trace.reschedule( reason = "Auto Scheduled" ) - locked = await Task.objects.filter( id = task.id, worker_lock = None ).aupdate( worker_lock = self.model ) + locked = await Task.objects.filter( id = task.id ).filter( + models.Q(worker_lock = None) | + models.Q(worker_lock = self.model) #This is incase the lock has been aquired for some reason before. + ).aupdate( worker_lock = self.model ) + if locked: await trace.asave() await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None ) diff --git a/setup.py b/setup.py index 6665f84..8fc353c 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='asyncron', - version='0.1.5', + version='0.1.6', packages=find_packages(), #include_package_data=True, # Include static files from MANIFEST.in install_requires=[