Better logging, less crashed
This commit is contained in:
parent
c6ecc71517
commit
7dfe29b10d
|
|
@ -4,7 +4,7 @@
|
|||
#
|
||||
##
|
||||
|
||||
import logging
|
||||
import traceback, logging
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
|
|
@ -31,9 +31,24 @@ class Command(BaseCommand):
|
|||
def handle( self, *arg, **kwargs ):
|
||||
AsyncronWorker.log = logging.getLogger(__name__)
|
||||
|
||||
while True:
|
||||
|
||||
worker = AsyncronWorker( daemon = False )
|
||||
print( "Starting:", worker )
|
||||
|
||||
try:
|
||||
worker.start( is_robust = True )
|
||||
except Exception as e:
|
||||
print("Worker Died with an error! Restarting in 10 seconds, traceback:")
|
||||
print( traceback.format_exc() )
|
||||
|
||||
#TODO: worker.cleanup_tasks()
|
||||
#worker.INSTANCES.remove( worker )
|
||||
#del worker
|
||||
time.sleep( 10 )
|
||||
|
||||
else: break
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -208,7 +208,6 @@ class Trace( BaseModel ):
|
|||
self.set_status( "C" )
|
||||
self.returned = output
|
||||
|
||||
finally:
|
||||
self.commit_on_new_print_task.cancel()
|
||||
self.last_end_datetime = timezone.now()
|
||||
await self.asave()
|
||||
|
|
|
|||
|
|
@ -8,3 +8,26 @@ def rgetattr(obj, attr, *args):
|
|||
def _getattr(obj, attr):
|
||||
return getattr(obj, attr, *args)
|
||||
return functools.reduce(_getattr, [obj] + attr.split('.'))
|
||||
|
||||
|
||||
|
||||
|
||||
#Django keeps giving: exception=OperationalError('the connection is closed')
|
||||
from django.db.utils import OperationalError
|
||||
def retry_on_db_error( f ):
|
||||
async def decorator( *args, **kwargs ):
|
||||
while True:
|
||||
try:
|
||||
return await f( *args, **kwargs )
|
||||
except OperationalError as e:
|
||||
await asyncio.sleep( 1 )
|
||||
return decorator
|
||||
|
||||
def ignore_on_db_error( f ):
|
||||
async def decorator( *args, **kwargs ):
|
||||
while True:
|
||||
try:
|
||||
return await f( *args, **kwargs )
|
||||
except OperationalError as e:
|
||||
return
|
||||
return decorator
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ import asyncio
|
|||
import collections, functools
|
||||
import random
|
||||
|
||||
from .utils import retry_on_db_error, ignore_on_db_error
|
||||
|
||||
class AsyncronWorker:
|
||||
INSTANCES = [] #AsyncronWorker instance
|
||||
MAX_COUNT = 0
|
||||
|
|
@ -89,7 +91,6 @@ class AsyncronWorker:
|
|||
self.thread = threading.Thread( target = self.start )
|
||||
self.thread.start()
|
||||
|
||||
|
||||
def start( self, is_robust = False ):
|
||||
assert not hasattr(self, "loop"), "This worker is already running!"
|
||||
from .models import Worker, Task, Trace
|
||||
|
|
@ -99,7 +100,7 @@ class AsyncronWorker:
|
|||
asyncio.set_event_loop( self.loop )
|
||||
|
||||
#Fight over who's gonna be the master, prove your health in the process!
|
||||
self.loop.create_task( self.master_loop() )
|
||||
self.loop.create_task( retry_on_db_error(self.master_loop)() )
|
||||
main_task = self.loop.create_task( self.work_loop() )
|
||||
|
||||
time.sleep(0.3) #To avoid the django initialization warning!
|
||||
|
|
@ -124,10 +125,12 @@ class AsyncronWorker:
|
|||
|
||||
self.loop.run_until_complete( self.graceful_shutdown() )
|
||||
|
||||
count = Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update(
|
||||
try:
|
||||
Trace.objects.filter( status__in = "SWRP", worker_lock = self.model ).update(
|
||||
status_reason = "Worker died during execution",
|
||||
status = "A", worker_lock = None
|
||||
)
|
||||
except: pass
|
||||
|
||||
#if count: print(f"Had to cancel {count} task(s).") #cls.log.warning
|
||||
|
||||
|
|
@ -221,7 +224,7 @@ class AsyncronWorker:
|
|||
current_master = True
|
||||
|
||||
if not self.clearing_dead_workers:
|
||||
self.loop.create_task( self.clear_dead_workers() )
|
||||
self.loop.create_task( ignore_on_db_error(self.clear_dead_workers)() )
|
||||
|
||||
await self.sync_tasks()
|
||||
await self.clear_orphaned_traces()
|
||||
|
|
@ -281,19 +284,19 @@ class AsyncronWorker:
|
|||
await self.check_scheduled()
|
||||
except OperationalError as e:
|
||||
self.log.warning(f"[Asyncron] DB Connection Error: {e}")
|
||||
print( traceback.format_exc() )
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
self.check_interval = 60 #break
|
||||
|
||||
except Exception as e:
|
||||
self.log.warning(f"[Asyncron] check_scheduled failed: {e}")
|
||||
print( traceback.format_exc() )
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
self.check_interval = 20
|
||||
|
||||
try:
|
||||
await sync_to_async( close_old_connections )()
|
||||
except Exception as e:
|
||||
self.log.warning(f"[Asyncron] close_old_connections failed: {e}")
|
||||
print( traceback.format_exc() )
|
||||
self.log.warning(f"[Asyncron] Traceback:\n{traceback.format_exc()}" )
|
||||
#break
|
||||
|
||||
|
||||
|
|
@ -330,7 +333,7 @@ class AsyncronWorker:
|
|||
except IntegrityError: count = 0
|
||||
if not count: continue #Lost the race condition to another worker.
|
||||
|
||||
self.loop.create_task( self.start_trace_on_time( trace ) )
|
||||
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
||||
|
||||
async def start_trace_on_time( self, trace ):
|
||||
from .models import Trace
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user