diff --git a/asyncron/apps.py b/asyncron/apps.py index 2d2dc81..76ae4e6 100644 --- a/asyncron/apps.py +++ b/asyncron/apps.py @@ -25,10 +25,12 @@ class AsyncronConfig(AppConfig): self.load_model_auxilaries() self.load_extensions() - #Init the asyncron worker for this process + #Init and run the asyncron worker singleton for this process if it's not already running. from .workers import AsyncronWorker - #The worker should not start working until they know we're responding to requests. - AsyncronWorker.init() + if AsyncronWorker.IS_ACTIVE: + worker = AsyncronWorker() + worker.start_after_db_ready = True + worker.start( daemon = True ) def import_per_app( self, names ): for app in apps.get_app_configs(): diff --git a/asyncron/gunicorn.py b/asyncron/gunicorn.py index 53221f6..e2e87e7 100644 --- a/asyncron/gunicorn.py +++ b/asyncron/gunicorn.py @@ -12,27 +12,32 @@ def post_fork( server, worker ): #worker and AsyncronWorker, pay attention! post_fork.worker = worker from .workers import AsyncronWorker - AsyncronWorker.log = worker.log - AsyncronWorker.log.info("Asyncron worker attached.") + AsyncronWorker.IS_ACTIVE = True + AsyncronWorker.register_init_callback( _patch ) + +def _patch( aworker ): + gserver = post_fork.server + gworker = post_fork.worker + + if not gworker.reloader: return #So if reload = False + + #Attach gunicorn reload event to asyncron_worker exit signals + original_callback = gworker.reloader._callback + def new_callback( *args, **kwargs ): + aworker.stop( reason = "Gunicorn Reload" ) + return original_callback( *args, **kwargs ) + gworker.reloader._callback = new_callback + + aworker.log.setLevel( gworker.log.loglevel ) + aworker.log.info( "Attached worker to gunicorn." ) + - init_to_override = AsyncronWorker.init - def init( *args, **kwargs ): - AsyncronWorker.MAX_COUNT = 1 - AsyncronWorker.override_exit_signals() - if worker.reloader: #So if reload = True - to_override = worker.reloader._callback - def new_callback(*args, **kwargs): - AsyncronWorker.stop( reason = "Auto Reload" ) - return to_override(*args, **kwargs) - worker.reloader._callback = new_callback - return init_to_override( *args, **kwargs ) - AsyncronWorker.init = init # Keeping the worker in post_fork.worker so we can add extra files in it for it to track -# TODO: Currently unfinished, since i just realized using the "inotify" support of gunicorn +# LOW PRIORITY TODO: Currently unfinished, since i just realized using the "inotify" support of gunicorn # makes this reduntant, but still here is the relevant code if I want to also support the simpler # polling system # Should be in asyncron.app.ready diff --git a/asyncron/management/commands/run_asyncron_worker.py b/asyncron/management/commands/run_asyncron_worker.py index 6198a8a..68bc06d 100644 --- a/asyncron/management/commands/run_asyncron_worker.py +++ b/asyncron/management/commands/run_asyncron_worker.py @@ -29,15 +29,15 @@ class Command(BaseCommand): help = 'Start an Asyncorn Worker' def handle( self, *arg, **kwargs ): - AsyncronWorker.log = logging.getLogger(__name__) + AsyncronWorker.IS_ACTIVE = True while True: - worker = AsyncronWorker( daemon = False ) + worker = AsyncronWorker() print( "Starting:", worker ) try: - worker.start( is_robust = True ) + worker.start() except Exception as e: print("Worker Died with an error! Restarting in 10 seconds, traceback:") print( traceback.format_exc() ) diff --git a/asyncron/workers.py b/asyncron/workers.py index 55e3e54..92e8af5 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -1,8 +1,12 @@ + from django.db import IntegrityError, models, close_old_connections -from django.utils import timezone +from django.db.backends import signals as django_signals from django.db.utils import OperationalError +from django.utils import timezone + from asgiref.sync import sync_to_async + import os, signal import time import threading @@ -13,11 +17,26 @@ import random from .utils import retry_on_db_error, ignore_on_db_error -class AsyncronWorker: - INSTANCES = [] #AsyncronWorker instance - MAX_COUNT = 0 +ASYNC_TASKS_CLEANUP_THRESHOLD = 64 - EXIST_SIGNALS = [ +class AsyncronWorker: + """ + The asyncron worker for a process, limited to one per proccess. + Other threads on the same process, can offload their work to this thread and it's single async event loop. + Or alternativley, This worker can run in non-daemon mode, and use the main thread. + + The first case is meant to be used alongside the web server, + And the second case is meant for a standalone (more robust) task proccessor. + + """ + + IS_ACTIVE = False #Wheter this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run! + INSTANCE = None #Singelton + INIT_CALLBACKS = [] #Once the singelton is created, it'll run through the callbacks with itself as the first arg. + + THREAD_LOCK = threading.Lock() #for initial singelton creation. + + EXIT_SIGNALS = [ signal.SIGABRT, signal.SIGHUP, signal.SIGQUIT, @@ -26,108 +45,201 @@ class AsyncronWorker: ] @classmethod - def override_exit_signals( cls ): - for sig in cls.EXIST_SIGNALS: + def register_init_callback( cls, callback ): + assert not cls.INSTANCE, "Cannot register new callbacks after the worker is created!" + cls.INIT_CALLBACKS.append( callback ) + + def __new__( cls, *args, **kwargs ): + """ + Thread safe singelton logic + """ + + if cls.INSTANCE: return cls.INSTANCE + with cls.THREAD_LOCK: + if cls.INSTANCE: return cls.INSTANCE + cls.INSTANCE = super().__new__( cls, *args, **kwargs ) + for callback in cls.INIT_CALLBACKS: callback( cls.INSTANCE ) + cls.INSTANCE.log #Evaluating the log property while we have the lock + cls.INSTANCE.register_with_exit_signals() + + django_signals.connection_created.connect( cls.INSTANCE.handle_new_db_connection ) + cls.INSTANCE.log.debug("Worker created for this process.") + return cls.INSTANCE + + @property + def log( self ): + if hasattr( self, '_log' ): return self._log + log = logging.getLogger("asyncron.worker") + handler = logging.StreamHandler() + #Taken from gunicorn, so it looks similar in the merged output + formatter = logging.Formatter( + r"%(asctime)s [%(process)d] [%(levelname)s] [Asyncron] %(message)s", + r"[%Y-%m-%d %H:%M:%S %z]" + ) + handler.setFormatter(formatter) + log.addHandler(handler) + log.setLevel(logging.DEBUG) + self._log = log + return log + + def register_with_exit_signals( self ): + """ + Hooks this worker into EXIT_SIGNALS, without messing up other handlers downstream. + resulting in self.handle_exit_signal to be called on exit signals. + """ + for sig in self.EXIT_SIGNALS: to_override = signal.getsignal(sig) - if getattr(to_override, "already_wrapped", False): - cls.log.warning( + if hasattr(to_override, "wraps_callable"): + self.log.warning( f"An attempt was made to wrap around the {signal.strsignal(sig)} signal again!" " Make sure you only call asyncron.AsyncronWorker.override_exit_signals once per process." ) continue - if to_override and callable(to_override): + if to_override and callable( to_override ): + def wrapped( signum, frame ): - cls.sigcatch( signum, frame ) - return to_override( signum, frame ) - wrapped.already_wrapped = True - cls.log.debug(f"Wrapped {to_override} inside sigcatch for {signal.strsignal(sig)}") + self.handle_exit_signal( signum, frame ) + return wrapped.wraps_callable( signum, frame ) + wrapped.wraps_callable = to_override + + self.log.debug(f"Wrapped '{to_override}' inside handle_exit_signal for: {signal.strsignal(sig)}") signal.signal(sig, wrapped) + else: - cls.log.debug(f"Direct sigcatch for {signal.strsignal(sig)}") - signal.signal(sig, cls.sigcatch) + self.log.debug(f"Directly listening for exit signal: {signal.strsignal(sig)}") + signal.signal(sig, self.handle_exit_signal) - @classmethod - def sigcatch( cls, signum, frame ): - cls.stop(f"Signal {signal.strsignal(signum)}") + def handle_exit_signal( self, signum, frame ): + self.stop(f"Signal {signal.strsignal(signum)}") - @classmethod - def stop( cls, reason = None ): - cls.log.info(f"[Asyncron] Stopping Worker(s): {reason}") - for worker in cls.INSTANCES: - if worker.is_stopping: continue - worker.is_stopping = True - worker.loop.call_soon_threadsafe(worker.loop.stop) + def handle_new_db_connection( self, sender, **kwargs ): + if self.is_db_ready: return + self.log.debug(f"First DB connection: {sender}") + from .models import Worker, Task, Trace - for worker in cls.INSTANCES: - if worker.thread.is_alive(): - worker.thread.join() + self.is_db_ready = True + django_signals.connection_created.disconnect( self.handle_new_db_connection ) - @classmethod - def init( cls ): - if len(cls.INSTANCES) < cls.MAX_COUNT: cls() - - #TODO: Use this to skip the 1 second delay in the self.start method on higher traffic servers. - #from django.db.backends.signals import connection_created - #from django.db.backends.postgresql.base import DatabaseWrapper - #from django.dispatch import receiver - #@receiver(connection_created, sender=DatabaseWrapper) - #def initial_connection_to_db(sender, **kwargs): - # if len(cls.INSTANCES) < cls.MAX_COUNT: cls() + if not self.loop: return + self.loop.call_soon_threadsafe( self.is_db_ready_event.set ) - - ## - ## Start of instance methods - ## - def __init__( self, daemon = True ): - self.INSTANCES.append(self) - self.is_stopping = False - self.clearing_dead_workers = False - self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks - self.work_loop_over = asyncio.Event() - self.database_unreachable = False - self.all_tasks = [] + def start( self, daemon = False ): + assert not self.thread, "This Worker has already been started once!" if daemon: - self.thread = threading.Thread( target = self.start ) + self.thread = threading.Thread( target = self.start_working ) self.thread.start() + return + + assert threading.main_thread() == threading.current_thread(), f"Cannot run a non daemon worker, in a thread other than main! Current Thread: {threading.current_thread()}" + self.thread = threading.current_thread() + self.start_working( is_robust = True ) + + + def stop( self, reason = None ): + if self.is_stopping: return #TODO: Insisting on exiting faster should probably be managed in the signal handler + + self.log.info(f"Stopping Worker: {reason}") + self.is_stopping = True + + if not self.loop: return + self.loop.call_soon_threadsafe( self.is_stopping_event.set ) + + + ## + ## North of here, all methods are to be run in the main thread. + ## South of here, all methods except __init__ are potentially in another thread, + ## with it's own dedicated async event loop. + ## + + + def __init__( self ): + + #Used only inside create_task method + self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD + self.tasks_running = [] + + #These booleans have asyncio.Event counterparts in the loop context + self.is_db_ready = False + self.is_stopping = False + + #Just so that the asyncron.apps.ready doens't trigger the django warning + self.start_after_db_ready = False + + #Parallelism + self.thread = None #Once the worker starts, it'll be populated + self.loop = None #Once the event loop is ready, it'll be populated + + self.clearing_dead_workers = False + self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks + + self.database_unreachable = False + + def event_loop_init( self ): + assert not self.loop, "This worker already has a running even loop!" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop( self.loop ) + + #These asyncio.Events have booleans counterparts in the __init__ section for main thread non async logic + self.is_db_ready_event = asyncio.Event() + self.is_stopping_event = asyncio.Event() + + self.task_reason_jobs_queue = asyncio.Queue() #Run tasks from other threads, safely def create_task( self, coro, *, silent = False, name = None, context = None ): if not silent: - origina_coro = coro - async def coro(): - try: return await origina_coro - except: - self.log.warning(f"[Asyncron] Task Error {origina_coro} {name}:\n{traceback.format_exc()}" ) + async def wrapped(): + try: + return await wrapped.coro + except KeyboardInterrupt as e: + self.log.debug(f"Task {wrapped.coro} {name} Interrupted with {e}." ) raise + except: + self.log.warning(f"Task Error {wrapped.coro} {name}:\n{traceback.format_exc()}" ) + raise + wrapped.coro = coro + coro = wrapped - self.all_tasks.append( self.loop.create_task( coro(), name = name, context = context ) ) + task = self.loop.create_task( coro(), name = name, context = context ) + self.tasks_running.append( task ) - for task in list(self.all_tasks): - if task.done(): self.all_tasks.remove(task) + if self.tasks_next_cleanup < len(self.tasks_running): + task_count = len(self.tasks_running) + for t in list(self.tasks_running): + if t.done(): self.tasks_running.remove(t) + self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD + len(self.tasks_running) + self.log.debug( + f"Cleaned up {task_count - len(self.tasks_running)} of {len(self.tasks_running)} tasks," + f" next cleanup at: {self.tasks_next_cleanup}" + ) + + return task + + def start_working( self, is_robust = False ): + self.event_loop_init() + + if self.start_after_db_ready: + self.log.debug("Waiting on another module to create the first database connection...") + self.loop.run_until_complete( self.is_db_ready_event.wait() ) - def start( self, is_robust = False ): - assert not hasattr(self, "loop"), "This worker is already running!" from .models import Worker, Task, Trace - self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust ) - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop( self.loop ) - #Run tasks from other threads, safely - self.task_reason_jobs_queue = asyncio.Queue() self.create_task( self.consume_task_reason_jobs_queue() ) + #Fight over who's gonna be the master, prove your health in the process! self.create_task( retry_on_db_error(self.master_loop)() ) main_task = self.create_task( self.work_loop() ) - time.sleep(0.3) #To avoid the django initialization warning! + #time.sleep(0.3) #To avoid the django initialization warning! self.model.save() self.model.refresh_from_db() + #Fill in the ID fields of the tasks we didn't dare to check with db until now from .models import Task for func in Task.registered_tasks.values(): @@ -139,7 +251,7 @@ class AsyncronWorker: self.attach_django_signals() try: - self.loop.run_until_complete( self.work_loop_over.wait() ) #This is the lifetime of this worker + self.loop.run_until_complete( self.is_stopping_event.wait() ) #This is the lifetime of this worker except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...") except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...") else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...") @@ -169,17 +281,18 @@ class AsyncronWorker: time.sleep( 0.1 ) else: break + self.log.debug("Worker stopped working.") #self.loop.call_soon(self.started.set) def attach_django_signals( self ): - django_signals = { + django_name_to_signals = { name : attr for name in ["post_save", "post_delete"] #TO Expand: dir(models.signals) if not name.startswith("_") #Dont get private stuff and ( attr := getattr(models.signals, name) ) #Just an assignment and isinstance( attr, models.signals.ModelSignal ) #Is a signal related to models! } - for name, signal in django_signals.items(): + for name, signal in django_name_to_signals.items(): signal.connect( functools.partial( self.model_changed, name ) ) from .models import Task @@ -336,24 +449,24 @@ class AsyncronWorker: await self.check_services() await self.check_scheduled() except OperationalError as e: - self.log.warning(f"[Asyncron] DB Connection Error: {e}") - self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) + self.log.warning(f"DB Connection Error: {e}") + self.log.warning(f"Traceback:\n{traceback.format_exc()}" ) self.check_interval = 60 #break except Exception as e: - self.log.warning(f"[Asyncron] check_scheduled failed: {e}") - self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) + self.log.warning(f"check_scheduled failed: {e}") + self.log.warning(f"Traceback:\n{traceback.format_exc()}" ) self.check_interval = 20 try: await sync_to_async( close_old_connections )() except Exception as e: - self.log.warning(f"[Asyncron] close_old_connections failed: {e}") - self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) + self.log.warning(f"close_old_connections failed: {e}") + self.log.warning(f"Traceback:\n{traceback.format_exc()}" ) #break - self.work_loop_over.set() + self.is_stopping_event.set() async def consume_task_reason_jobs_queue( self ): while True: