From 7dfe29b10da1e7cfd396fd134d8b30341b306738 Mon Sep 17 00:00:00 2001 From: Oracle Date: Thu, 13 Mar 2025 19:56:45 +0100 Subject: [PATCH] Better logging, less crashed --- .../commands/run_asyncron_worker.py | 23 ++++++++++++++--- asyncron/models.py | 9 +++---- asyncron/utils.py | 23 +++++++++++++++++ asyncron/workers.py | 25 +++++++++++-------- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/asyncron/management/commands/run_asyncron_worker.py b/asyncron/management/commands/run_asyncron_worker.py index 166fb74..4eb0e63 100644 --- a/asyncron/management/commands/run_asyncron_worker.py +++ b/asyncron/management/commands/run_asyncron_worker.py @@ -4,7 +4,7 @@ # ## -import logging +import traceback, logging import asyncio import time @@ -31,9 +31,24 @@ class Command(BaseCommand): def handle( self, *arg, **kwargs ): AsyncronWorker.log = logging.getLogger(__name__) - worker = AsyncronWorker( daemon = False ) - print( "Starting:", worker ) - worker.start( is_robust = True ) + while True: + + worker = AsyncronWorker( daemon = False ) + print( "Starting:", worker ) + + try: + worker.start( is_robust = True ) + except Exception as e: + print("Worker Died with an error! Restarting in 10 seconds, traceback:") + print( traceback.format_exc() ) + + #TODO: worker.cleanup_tasks() + #worker.INSTANCES.remove( worker ) + #del worker + time.sleep( 10 ) + + else: break + diff --git a/asyncron/models.py b/asyncron/models.py index aef5153..a2744c7 100644 --- a/asyncron/models.py +++ b/asyncron/models.py @@ -203,15 +203,14 @@ class Trace( BaseModel ): except Exception as e: self.set_status( "E", f"Exception: {e}" ) self.stderr = traceback.format_exc() - + else: self.set_status( "C" ) self.returned = output - finally: - self.commit_on_new_print_task.cancel() - self.last_end_datetime = timezone.now() - await self.asave() + self.commit_on_new_print_task.cancel() + self.last_end_datetime = timezone.now() + await self.asave() ### Runtime methods for self aware tasks diff --git a/asyncron/utils.py b/asyncron/utils.py index 89aa672..c4bf9ba 100644 --- a/asyncron/utils.py +++ b/asyncron/utils.py @@ -8,3 +8,26 @@ def rgetattr(obj, attr, *args): def _getattr(obj, attr): return getattr(obj, attr, *args) return functools.reduce(_getattr, [obj] + attr.split('.')) + + + + +#Django keeps giving: exception=OperationalError('the connection is closed') +from django.db.utils import OperationalError +def retry_on_db_error( f ): + async def decorator( *args, **kwargs ): + while True: + try: + return await f( *args, **kwargs ) + except OperationalError as e: + await asyncio.sleep( 1 ) + return decorator + +def ignore_on_db_error( f ): + async def decorator( *args, **kwargs ): + while True: + try: + return await f( *args, **kwargs ) + except OperationalError as e: + return + return decorator diff --git a/asyncron/workers.py b/asyncron/workers.py index 09b16bd..3d48953 100644 --- a/asyncron/workers.py +++ b/asyncron/workers.py @@ -11,6 +11,8 @@ import asyncio import collections, functools import random +from .utils import retry_on_db_error, ignore_on_db_error + class AsyncronWorker: INSTANCES = [] #AsyncronWorker instance MAX_COUNT = 0 @@ -89,7 +91,6 @@ class AsyncronWorker: self.thread = threading.Thread( target = self.start ) self.thread.start() - def start( self, is_robust = False ): assert not hasattr(self, "loop"), "This worker is already running!" from .models import Worker, Task, Trace @@ -99,7 +100,7 @@ class AsyncronWorker: asyncio.set_event_loop( self.loop ) #Fight over who's gonna be the master, prove your health in the process! - self.loop.create_task( self.master_loop() ) + self.loop.create_task( retry_on_db_error(self.master_loop)() ) main_task = self.loop.create_task( self.work_loop() ) time.sleep(0.3) #To avoid the django initialization warning! @@ -124,10 +125,12 @@ class AsyncronWorker: self.loop.run_until_complete( self.graceful_shutdown() ) - count = Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update( - status_reason = "Worker died during execution", - status = "A", worker_lock = None - ) + try: + Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update( + status_reason = "Worker died during execution", + status = "A", worker_lock = None + ) + except: pass #if count: print(f"Had to cancel {count} task(s).") #cls.log.warning @@ -221,7 +224,7 @@ class AsyncronWorker: current_master = True if not self.clearing_dead_workers: - self.loop.create_task( self.clear_dead_workers() ) + self.loop.create_task( ignore_on_db_error(self.clear_dead_workers)() ) await self.sync_tasks() await self.clear_orphaned_traces() @@ -281,19 +284,19 @@ class AsyncronWorker: await self.check_scheduled() except OperationalError as e: self.log.warning(f"[Asyncron] DB Connection Error: {e}") - print( traceback.format_exc() ) + self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) self.check_interval = 60 #break except Exception as e: self.log.warning(f"[Asyncron] check_scheduled failed: {e}") - print( traceback.format_exc() ) + self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) self.check_interval = 20 try: await sync_to_async( close_old_connections )() except Exception as e: self.log.warning(f"[Asyncron] close_old_connections failed: {e}") - print( traceback.format_exc() ) + self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" ) #break @@ -330,7 +333,7 @@ class AsyncronWorker: except IntegrityError: count = 0 if not count: continue #Lost the race condition to another worker. - self.loop.create_task( self.start_trace_on_time( trace ) ) + self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) ) async def start_trace_on_time( self, trace ): from .models import Trace