Clean and overhauled paralelism. (part 1)
This commit is contained in:
parent
271bb1f170
commit
429c694629
|
|
@ -25,10 +25,12 @@ class AsyncronConfig(AppConfig):
|
|||
self.load_model_auxilaries()
|
||||
self.load_extensions()
|
||||
|
||||
#Init the asyncron worker for this process
|
||||
#Init and run the asyncron worker singleton for this process if it's not already running.
|
||||
from .workers import AsyncronWorker
|
||||
#The worker should not start working until they know we're responding to requests.
|
||||
AsyncronWorker.init()
|
||||
if AsyncronWorker.IS_ACTIVE:
|
||||
worker = AsyncronWorker()
|
||||
worker.start_after_db_ready = True
|
||||
worker.start( daemon = True )
|
||||
|
||||
def import_per_app( self, names ):
|
||||
for app in apps.get_app_configs():
|
||||
|
|
|
|||
|
|
@ -12,27 +12,32 @@ def post_fork( server, worker ): #worker and AsyncronWorker, pay attention!
|
|||
post_fork.worker = worker
|
||||
|
||||
from .workers import AsyncronWorker
|
||||
AsyncronWorker.log = worker.log
|
||||
AsyncronWorker.log.info("Asyncron worker attached.")
|
||||
AsyncronWorker.IS_ACTIVE = True
|
||||
AsyncronWorker.register_init_callback( _patch )
|
||||
|
||||
init_to_override = AsyncronWorker.init
|
||||
def init( *args, **kwargs ):
|
||||
AsyncronWorker.MAX_COUNT = 1
|
||||
AsyncronWorker.override_exit_signals()
|
||||
def _patch( aworker ):
|
||||
gserver = post_fork.server
|
||||
gworker = post_fork.worker
|
||||
|
||||
if worker.reloader: #So if reload = True
|
||||
to_override = worker.reloader._callback
|
||||
if not gworker.reloader: return #So if reload = False
|
||||
|
||||
#Attach gunicorn reload event to asyncron_worker exit signals
|
||||
original_callback = gworker.reloader._callback
|
||||
def new_callback( *args, **kwargs ):
|
||||
AsyncronWorker.stop( reason = "Auto Reload" )
|
||||
return to_override(*args, **kwargs)
|
||||
worker.reloader._callback = new_callback
|
||||
aworker.stop( reason = "Gunicorn Reload" )
|
||||
return original_callback( *args, **kwargs )
|
||||
gworker.reloader._callback = new_callback
|
||||
|
||||
aworker.log.setLevel( gworker.log.loglevel )
|
||||
aworker.log.info( "Attached worker to gunicorn." )
|
||||
|
||||
|
||||
|
||||
|
||||
return init_to_override( *args, **kwargs )
|
||||
AsyncronWorker.init = init
|
||||
|
||||
|
||||
# Keeping the worker in post_fork.worker so we can add extra files in it for it to track
|
||||
# TODO: Currently unfinished, since i just realized using the "inotify" support of gunicorn
|
||||
# LOW PRIORITY TODO: Currently unfinished, since i just realized using the "inotify" support of gunicorn
|
||||
# makes this reduntant, but still here is the relevant code if I want to also support the simpler
|
||||
# polling system
|
||||
# Should be in asyncron.app.ready
|
||||
|
|
|
|||
|
|
@ -29,15 +29,15 @@ class Command(BaseCommand):
|
|||
help = 'Start an Asyncorn Worker'
|
||||
|
||||
def handle( self, *arg, **kwargs ):
|
||||
AsyncronWorker.log = logging.getLogger(__name__)
|
||||
AsyncronWorker.IS_ACTIVE = True
|
||||
|
||||
while True:
|
||||
|
||||
worker = AsyncronWorker( daemon = False )
|
||||
worker = AsyncronWorker()
|
||||
print( "Starting:", worker )
|
||||
|
||||
try:
|
||||
worker.start( is_robust = True )
|
||||
worker.start()
|
||||
except Exception as e:
|
||||
print("Worker Died with an error! Restarting in 10 seconds, traceback:")
|
||||
print( traceback.format_exc() )
|
||||
|
|
|
|||
|
|
@ -1,8 +1,12 @@
|
|||
|
||||
from django.db import IntegrityError, models, close_old_connections
|
||||
from django.utils import timezone
|
||||
from django.db.backends import signals as django_signals
|
||||
from django.db.utils import OperationalError
|
||||
from django.utils import timezone
|
||||
|
||||
from asgiref.sync import sync_to_async
|
||||
|
||||
|
||||
import os, signal
|
||||
import time
|
||||
import threading
|
||||
|
|
@ -13,11 +17,26 @@ import random
|
|||
|
||||
from .utils import retry_on_db_error, ignore_on_db_error
|
||||
|
||||
class AsyncronWorker:
|
||||
INSTANCES = [] #AsyncronWorker instance
|
||||
MAX_COUNT = 0
|
||||
ASYNC_TASKS_CLEANUP_THRESHOLD = 64
|
||||
|
||||
EXIST_SIGNALS = [
|
||||
class AsyncronWorker:
|
||||
"""
|
||||
The asyncron worker for a process, limited to one per proccess.
|
||||
Other threads on the same process, can offload their work to this thread and it's single async event loop.
|
||||
Or alternativley, This worker can run in non-daemon mode, and use the main thread.
|
||||
|
||||
The first case is meant to be used alongside the web server,
|
||||
And the second case is meant for a standalone (more robust) task proccessor.
|
||||
|
||||
"""
|
||||
|
||||
IS_ACTIVE = False #Wheter this proccess needs an AsyncronWorker at all, this prevents running the worker on all the things that cause asyncron.app.ready to run!
|
||||
INSTANCE = None #Singelton
|
||||
INIT_CALLBACKS = [] #Once the singelton is created, it'll run through the callbacks with itself as the first arg.
|
||||
|
||||
THREAD_LOCK = threading.Lock() #for initial singelton creation.
|
||||
|
||||
EXIT_SIGNALS = [
|
||||
signal.SIGABRT,
|
||||
signal.SIGHUP,
|
||||
signal.SIGQUIT,
|
||||
|
|
@ -26,108 +45,201 @@ class AsyncronWorker:
|
|||
]
|
||||
|
||||
@classmethod
|
||||
def override_exit_signals( cls ):
|
||||
for sig in cls.EXIST_SIGNALS:
|
||||
def register_init_callback( cls, callback ):
|
||||
assert not cls.INSTANCE, "Cannot register new callbacks after the worker is created!"
|
||||
cls.INIT_CALLBACKS.append( callback )
|
||||
|
||||
def __new__( cls, *args, **kwargs ):
|
||||
"""
|
||||
Thread safe singelton logic
|
||||
"""
|
||||
|
||||
if cls.INSTANCE: return cls.INSTANCE
|
||||
with cls.THREAD_LOCK:
|
||||
if cls.INSTANCE: return cls.INSTANCE
|
||||
cls.INSTANCE = super().__new__( cls, *args, **kwargs )
|
||||
for callback in cls.INIT_CALLBACKS: callback( cls.INSTANCE )
|
||||
cls.INSTANCE.log #Evaluating the log property while we have the lock
|
||||
cls.INSTANCE.register_with_exit_signals()
|
||||
|
||||
django_signals.connection_created.connect( cls.INSTANCE.handle_new_db_connection )
|
||||
cls.INSTANCE.log.debug("Worker created for this process.")
|
||||
return cls.INSTANCE
|
||||
|
||||
@property
|
||||
def log( self ):
|
||||
if hasattr( self, '_log' ): return self._log
|
||||
log = logging.getLogger("asyncron.worker")
|
||||
handler = logging.StreamHandler()
|
||||
#Taken from gunicorn, so it looks similar in the merged output
|
||||
formatter = logging.Formatter(
|
||||
r"%(asctime)s [%(process)d] [%(levelname)s] [Asyncron] %(message)s",
|
||||
r"[%Y-%m-%d %H:%M:%S %z]"
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
log.addHandler(handler)
|
||||
log.setLevel(logging.DEBUG)
|
||||
self._log = log
|
||||
return log
|
||||
|
||||
def register_with_exit_signals( self ):
|
||||
"""
|
||||
Hooks this worker into EXIT_SIGNALS, without messing up other handlers downstream.
|
||||
resulting in self.handle_exit_signal to be called on exit signals.
|
||||
"""
|
||||
for sig in self.EXIT_SIGNALS:
|
||||
to_override = signal.getsignal(sig)
|
||||
if getattr(to_override, "already_wrapped", False):
|
||||
cls.log.warning(
|
||||
if hasattr(to_override, "wraps_callable"):
|
||||
self.log.warning(
|
||||
f"An attempt was made to wrap around the {signal.strsignal(sig)} signal again!"
|
||||
" Make sure you only call asyncron.AsyncronWorker.override_exit_signals once per process."
|
||||
)
|
||||
continue
|
||||
|
||||
if to_override and callable( to_override ):
|
||||
|
||||
def wrapped( signum, frame ):
|
||||
cls.sigcatch( signum, frame )
|
||||
return to_override( signum, frame )
|
||||
wrapped.already_wrapped = True
|
||||
cls.log.debug(f"Wrapped {to_override} inside sigcatch for {signal.strsignal(sig)}")
|
||||
self.handle_exit_signal( signum, frame )
|
||||
return wrapped.wraps_callable( signum, frame )
|
||||
wrapped.wraps_callable = to_override
|
||||
|
||||
self.log.debug(f"Wrapped '{to_override}' inside handle_exit_signal for: {signal.strsignal(sig)}")
|
||||
signal.signal(sig, wrapped)
|
||||
|
||||
else:
|
||||
cls.log.debug(f"Direct sigcatch for {signal.strsignal(sig)}")
|
||||
signal.signal(sig, cls.sigcatch)
|
||||
self.log.debug(f"Directly listening for exit signal: {signal.strsignal(sig)}")
|
||||
signal.signal(sig, self.handle_exit_signal)
|
||||
|
||||
@classmethod
|
||||
def sigcatch( cls, signum, frame ):
|
||||
cls.stop(f"Signal {signal.strsignal(signum)}")
|
||||
def handle_exit_signal( self, signum, frame ):
|
||||
self.stop(f"Signal {signal.strsignal(signum)}")
|
||||
|
||||
@classmethod
|
||||
def stop( cls, reason = None ):
|
||||
cls.log.info(f"[Asyncron] Stopping Worker(s): {reason}")
|
||||
for worker in cls.INSTANCES:
|
||||
if worker.is_stopping: continue
|
||||
worker.is_stopping = True
|
||||
worker.loop.call_soon_threadsafe(worker.loop.stop)
|
||||
def handle_new_db_connection( self, sender, **kwargs ):
|
||||
if self.is_db_ready: return
|
||||
self.log.debug(f"First DB connection: {sender}")
|
||||
from .models import Worker, Task, Trace
|
||||
|
||||
for worker in cls.INSTANCES:
|
||||
if worker.thread.is_alive():
|
||||
worker.thread.join()
|
||||
self.is_db_ready = True
|
||||
django_signals.connection_created.disconnect( self.handle_new_db_connection )
|
||||
|
||||
@classmethod
|
||||
def init( cls ):
|
||||
if len(cls.INSTANCES) < cls.MAX_COUNT: cls()
|
||||
|
||||
#TODO: Use this to skip the 1 second delay in the self.start method on higher traffic servers.
|
||||
#from django.db.backends.signals import connection_created
|
||||
#from django.db.backends.postgresql.base import DatabaseWrapper
|
||||
#from django.dispatch import receiver
|
||||
#@receiver(connection_created, sender=DatabaseWrapper)
|
||||
#def initial_connection_to_db(sender, **kwargs):
|
||||
# if len(cls.INSTANCES) < cls.MAX_COUNT: cls()
|
||||
if not self.loop: return
|
||||
self.loop.call_soon_threadsafe( self.is_db_ready_event.set )
|
||||
|
||||
|
||||
|
||||
##
|
||||
## Start of instance methods
|
||||
##
|
||||
def __init__( self, daemon = True ):
|
||||
self.INSTANCES.append(self)
|
||||
self.is_stopping = False
|
||||
self.clearing_dead_workers = False
|
||||
self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks
|
||||
self.work_loop_over = asyncio.Event()
|
||||
self.database_unreachable = False
|
||||
self.all_tasks = []
|
||||
def start( self, daemon = False ):
|
||||
assert not self.thread, "This Worker has already been started once!"
|
||||
|
||||
if daemon:
|
||||
self.thread = threading.Thread( target = self.start )
|
||||
self.thread = threading.Thread( target = self.start_working )
|
||||
self.thread.start()
|
||||
return
|
||||
|
||||
assert threading.main_thread() == threading.current_thread(), f"Cannot run a non daemon worker, in a thread other than main! Current Thread: {threading.current_thread()}"
|
||||
self.thread = threading.current_thread()
|
||||
self.start_working( is_robust = True )
|
||||
|
||||
|
||||
def stop( self, reason = None ):
|
||||
if self.is_stopping: return #TODO: Insisting on exiting faster should probably be managed in the signal handler
|
||||
|
||||
self.log.info(f"Stopping Worker: {reason}")
|
||||
self.is_stopping = True
|
||||
|
||||
if not self.loop: return
|
||||
self.loop.call_soon_threadsafe( self.is_stopping_event.set )
|
||||
|
||||
|
||||
##
|
||||
## North of here, all methods are to be run in the main thread.
|
||||
## South of here, all methods except __init__ are potentially in another thread,
|
||||
## with it's own dedicated async event loop.
|
||||
##
|
||||
|
||||
|
||||
def __init__( self ):
|
||||
|
||||
#Used only inside create_task method
|
||||
self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD
|
||||
self.tasks_running = []
|
||||
|
||||
#These booleans have asyncio.Event counterparts in the loop context
|
||||
self.is_db_ready = False
|
||||
self.is_stopping = False
|
||||
|
||||
#Just so that the asyncron.apps.ready doens't trigger the django warning
|
||||
self.start_after_db_ready = False
|
||||
|
||||
#Parallelism
|
||||
self.thread = None #Once the worker starts, it'll be populated
|
||||
self.loop = None #Once the event loop is ready, it'll be populated
|
||||
|
||||
self.clearing_dead_workers = False
|
||||
self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks
|
||||
|
||||
self.database_unreachable = False
|
||||
|
||||
def event_loop_init( self ):
|
||||
assert not self.loop, "This worker already has a running even loop!"
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop( self.loop )
|
||||
|
||||
#These asyncio.Events have booleans counterparts in the __init__ section for main thread non async logic
|
||||
self.is_db_ready_event = asyncio.Event()
|
||||
self.is_stopping_event = asyncio.Event()
|
||||
|
||||
self.task_reason_jobs_queue = asyncio.Queue() #Run tasks from other threads, safely
|
||||
|
||||
def create_task( self, coro, *, silent = False, name = None, context = None ):
|
||||
|
||||
if not silent:
|
||||
origina_coro = coro
|
||||
async def coro():
|
||||
try: return await origina_coro
|
||||
except:
|
||||
self.log.warning(f"[Asyncron] Task Error {origina_coro} {name}:\n{traceback.format_exc()}" )
|
||||
async def wrapped():
|
||||
try:
|
||||
return await wrapped.coro
|
||||
except KeyboardInterrupt as e:
|
||||
self.log.debug(f"Task {wrapped.coro} {name} Interrupted with {e}." )
|
||||
raise
|
||||
except:
|
||||
self.log.warning(f"Task Error {wrapped.coro} {name}:\n{traceback.format_exc()}" )
|
||||
raise
|
||||
wrapped.coro = coro
|
||||
coro = wrapped
|
||||
|
||||
self.all_tasks.append( self.loop.create_task( coro(), name = name, context = context ) )
|
||||
task = self.loop.create_task( coro(), name = name, context = context )
|
||||
self.tasks_running.append( task )
|
||||
|
||||
for task in list(self.all_tasks):
|
||||
if task.done(): self.all_tasks.remove(task)
|
||||
if self.tasks_next_cleanup < len(self.tasks_running):
|
||||
task_count = len(self.tasks_running)
|
||||
for t in list(self.tasks_running):
|
||||
if t.done(): self.tasks_running.remove(t)
|
||||
self.tasks_next_cleanup = ASYNC_TASKS_CLEANUP_THRESHOLD + len(self.tasks_running)
|
||||
self.log.debug(
|
||||
f"Cleaned up {task_count - len(self.tasks_running)} of {len(self.tasks_running)} tasks,"
|
||||
f" next cleanup at: {self.tasks_next_cleanup}"
|
||||
)
|
||||
|
||||
return task
|
||||
|
||||
def start_working( self, is_robust = False ):
|
||||
self.event_loop_init()
|
||||
|
||||
if self.start_after_db_ready:
|
||||
self.log.debug("Waiting on another module to create the first database connection...")
|
||||
self.loop.run_until_complete( self.is_db_ready_event.wait() )
|
||||
|
||||
def start( self, is_robust = False ):
|
||||
assert not hasattr(self, "loop"), "This worker is already running!"
|
||||
from .models import Worker, Task, Trace
|
||||
|
||||
self.model = Worker( pid = os.getpid(), thread_id = threading.get_ident(), is_robust = is_robust )
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop( self.loop )
|
||||
|
||||
#Run tasks from other threads, safely
|
||||
self.task_reason_jobs_queue = asyncio.Queue()
|
||||
self.create_task( self.consume_task_reason_jobs_queue() )
|
||||
|
||||
|
||||
#Fight over who's gonna be the master, prove your health in the process!
|
||||
self.create_task( retry_on_db_error(self.master_loop)() )
|
||||
main_task = self.create_task( self.work_loop() )
|
||||
|
||||
time.sleep(0.3) #To avoid the django initialization warning!
|
||||
#time.sleep(0.3) #To avoid the django initialization warning!
|
||||
self.model.save()
|
||||
self.model.refresh_from_db()
|
||||
|
||||
|
||||
#Fill in the ID fields of the tasks we didn't dare to check with db until now
|
||||
from .models import Task
|
||||
for func in Task.registered_tasks.values():
|
||||
|
|
@ -139,7 +251,7 @@ class AsyncronWorker:
|
|||
self.attach_django_signals()
|
||||
|
||||
try:
|
||||
self.loop.run_until_complete( self.work_loop_over.wait() ) #This is the lifetime of this worker
|
||||
self.loop.run_until_complete( self.is_stopping_event.wait() ) #This is the lifetime of this worker
|
||||
except KeyboardInterrupt: self.log.info(f"[Asyncron][W{self.model.id}] Worker Received KeyboardInterrupt, exiting...")
|
||||
except RuntimeError: self.log.info(f"[Asyncron][W{self.model.id}] Worker Stopped, exiting...")
|
||||
else: self.log.info(f"[Asyncron][W{self.model.id}] Worker exiting...")
|
||||
|
|
@ -169,17 +281,18 @@ class AsyncronWorker:
|
|||
time.sleep( 0.1 )
|
||||
else: break
|
||||
|
||||
self.log.debug("Worker stopped working.")
|
||||
#self.loop.call_soon(self.started.set)
|
||||
|
||||
def attach_django_signals( self ):
|
||||
django_signals = {
|
||||
django_name_to_signals = {
|
||||
name : attr
|
||||
for name in ["post_save", "post_delete"] #TO Expand: dir(models.signals)
|
||||
if not name.startswith("_") #Dont get private stuff
|
||||
and ( attr := getattr(models.signals, name) ) #Just an assignment
|
||||
and isinstance( attr, models.signals.ModelSignal ) #Is a signal related to models!
|
||||
}
|
||||
for name, signal in django_signals.items():
|
||||
for name, signal in django_name_to_signals.items():
|
||||
signal.connect( functools.partial( self.model_changed, name ) )
|
||||
|
||||
from .models import Task
|
||||
|
|
@ -336,24 +449,24 @@ class AsyncronWorker:
|
|||
await self.check_services()
|
||||
await self.check_scheduled()
|
||||
except OperationalError as e:
|
||||
self.log.warning(f"[Asyncron] DB Connection Error: {e}")
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
self.log.warning(f"DB Connection Error: {e}")
|
||||
self.log.warning(f"Traceback:\n{traceback.format_exc()}" )
|
||||
self.check_interval = 60 #break
|
||||
|
||||
except Exception as e:
|
||||
self.log.warning(f"[Asyncron] check_scheduled failed: {e}")
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
self.log.warning(f"check_scheduled failed: {e}")
|
||||
self.log.warning(f"Traceback:\n{traceback.format_exc()}" )
|
||||
self.check_interval = 20
|
||||
|
||||
try:
|
||||
await sync_to_async( close_old_connections )()
|
||||
except Exception as e:
|
||||
self.log.warning(f"[Asyncron] close_old_connections failed: {e}")
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
self.log.warning(f"close_old_connections failed: {e}")
|
||||
self.log.warning(f"Traceback:\n{traceback.format_exc()}" )
|
||||
#break
|
||||
|
||||
|
||||
self.work_loop_over.set()
|
||||
self.is_stopping_event.set()
|
||||
|
||||
async def consume_task_reason_jobs_queue( self ):
|
||||
while True:
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user