Fixed django_signal handler regression, bumped version and more error messages now
This commit is contained in:
parent
c2e89d48d5
commit
de2885d20d
|
|
@ -36,5 +36,6 @@ def ignore_on_db_error( f ):
|
||||||
try:
|
try:
|
||||||
return await f( *args, **kwargs )
|
return await f( *args, **kwargs )
|
||||||
except OperationalError as e:
|
except OperationalError as e:
|
||||||
return
|
raise #return For DEV
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
|
|
|
||||||
|
|
@ -87,11 +87,27 @@ class AsyncronWorker:
|
||||||
self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks
|
self.watching_models = collections.defaultdict( set ) # Model -> Set of key name of the tasks
|
||||||
self.work_loop_over = asyncio.Event()
|
self.work_loop_over = asyncio.Event()
|
||||||
self.database_unreachable = False
|
self.database_unreachable = False
|
||||||
|
self.all_tasks = []
|
||||||
|
|
||||||
if daemon:
|
if daemon:
|
||||||
self.thread = threading.Thread( target = self.start )
|
self.thread = threading.Thread( target = self.start )
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
|
def create_task( self, coro, *, silent = False, name = None, context = None ):
|
||||||
|
|
||||||
|
if not silent:
|
||||||
|
origina_coro = coro
|
||||||
|
async def coro():
|
||||||
|
try: return await origina_coro
|
||||||
|
except:
|
||||||
|
self.log.warning(f"[Asyncron] Task Error {origina_coro} {name}:\n{traceback.format_exc()}" )
|
||||||
|
raise
|
||||||
|
|
||||||
|
self.all_tasks.append( self.loop.create_task( coro(), name = name, context = context ) )
|
||||||
|
|
||||||
|
for task in list(self.all_tasks):
|
||||||
|
if task.done(): self.all_tasks.remove(task)
|
||||||
|
|
||||||
def start( self, is_robust = False ):
|
def start( self, is_robust = False ):
|
||||||
assert not hasattr(self, "loop"), "This worker is already running!"
|
assert not hasattr(self, "loop"), "This worker is already running!"
|
||||||
from .models import Worker, Task, Trace
|
from .models import Worker, Task, Trace
|
||||||
|
|
@ -101,8 +117,8 @@ class AsyncronWorker:
|
||||||
asyncio.set_event_loop( self.loop )
|
asyncio.set_event_loop( self.loop )
|
||||||
|
|
||||||
#Fight over who's gonna be the master, prove your health in the process!
|
#Fight over who's gonna be the master, prove your health in the process!
|
||||||
self.loop.create_task( retry_on_db_error(self.master_loop)() )
|
self.create_task( retry_on_db_error(self.master_loop)() )
|
||||||
main_task = self.loop.create_task( self.work_loop() )
|
main_task = self.create_task( self.work_loop() )
|
||||||
|
|
||||||
time.sleep(0.3) #To avoid the django initialization warning!
|
time.sleep(0.3) #To avoid the django initialization warning!
|
||||||
self.model.save()
|
self.model.save()
|
||||||
|
|
@ -182,7 +198,7 @@ class AsyncronWorker:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
self.start_task_now( task ),
|
self.start_task_now( task, reason = f"Change ({signal_name}) on {instance}" ),
|
||||||
self.loop
|
self.loop
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -242,7 +258,7 @@ class AsyncronWorker:
|
||||||
current_master = True
|
current_master = True
|
||||||
|
|
||||||
if not self.clearing_dead_workers:
|
if not self.clearing_dead_workers:
|
||||||
self.loop.create_task( ignore_on_db_error(self.clear_dead_workers)() )
|
self.create_task( self.clear_dead_workers() )
|
||||||
|
|
||||||
await self.sync_tasks()
|
await self.sync_tasks()
|
||||||
await self.clear_orphaned_traces()
|
await self.clear_orphaned_traces()
|
||||||
|
|
@ -360,7 +376,7 @@ class AsyncronWorker:
|
||||||
except IntegrityError: count = 0
|
except IntegrityError: count = 0
|
||||||
if not count: continue #Lost the race condition to another worker.
|
if not count: continue #Lost the race condition to another worker.
|
||||||
|
|
||||||
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
self.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
||||||
|
|
||||||
async def check_services( self ):
|
async def check_services( self ):
|
||||||
from .models import Task, Trace
|
from .models import Task, Trace
|
||||||
|
|
@ -386,16 +402,16 @@ class AsyncronWorker:
|
||||||
await trace.asave()
|
await trace.asave()
|
||||||
|
|
||||||
#self.running_service_tasks[task.id] =
|
#self.running_service_tasks[task.id] =
|
||||||
self.loop.create_task( self.start_trace_on_time( trace ) )
|
self.create_task( self.start_trace_on_time( trace ) )
|
||||||
await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )
|
await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )
|
||||||
|
|
||||||
async def start_task_now( self, task ):
|
async def start_task_now( self, task, reason = "" ):
|
||||||
trace = task.new_trace()
|
trace = task.new_trace()
|
||||||
trace.set_status( "S", f"Change ({signal_name}) on {instance}" )
|
trace.set_status( "S", reason )
|
||||||
trace.scheduled_datetime = timezone.now() #So it runs instantly
|
trace.scheduled_datetime = timezone.now() #So it runs instantly
|
||||||
trace.worker_lock_id = self.model.id
|
trace.worker_lock_id = self.model.id
|
||||||
|
|
||||||
self.loop.create_task( ignore_on_db_error(self.start_trace_on_time)( trace ) )
|
self.create_task( self.start_trace_on_time( trace ) )
|
||||||
|
|
||||||
async def start_trace_on_time( self, trace ):
|
async def start_trace_on_time( self, trace ):
|
||||||
from .models import Trace
|
from .models import Trace
|
||||||
|
|
|
||||||
2
setup.py
2
setup.py
|
|
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='asyncron',
|
name='asyncron',
|
||||||
version='0.1.10',
|
version='0.1.10.1',
|
||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
#include_package_data=True, # Include static files from MANIFEST.in
|
#include_package_data=True, # Include static files from MANIFEST.in
|
||||||
install_requires=[
|
install_requires=[
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user