Added services support and self_aware tasks

This commit is contained in:
Oracle 2025-01-21 23:34:38 +01:00
parent 5503d22a6d
commit 9ab4e3ea8d
6 changed files with 71 additions and 15 deletions

View File

@ -1,4 +1,4 @@
from .shortcuts import task, run_on_model_change, extension from .shortcuts import task, service, run_on_model_change, extension
__all__ = [ __all__ = [
'task', 'task',

View File

@ -40,7 +40,7 @@ class WorkerAdmin( BaseModelAdmin ):
class TaskAdmin( BaseModelAdmin ): class TaskAdmin( BaseModelAdmin ):
order = 1 order = 1
list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged_executions', 'last_execution', 'scheduled' list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged_executions', 'last_execution', 'scheduled'
fields = ["name", "description", "type", "on_model_change", "jitter"] fields = ["name", "description", "type", "on_model_change", "jitter", "self_aware"]
actions = 'schedule_execution', 'execution_now', 'delete_script_missing' actions = 'schedule_execution', 'execution_now', 'delete_script_missing'
def has_add_permission( self, request, obj = None ): return False def has_add_permission( self, request, obj = None ): return False

View File

@ -0,0 +1,18 @@
# Generated by Django 5.1.2 on 2025-01-21 22:21
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('asyncron', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='task',
name='self_aware',
field=models.BooleanField(default=False, help_text="Whether It's first argument is 'self', being a trace instance."),
),
]

View File

@ -55,6 +55,7 @@ class Task( BaseModel ):
null = True, blank = True null = True, blank = True
) #None will mean it's a "service" like task ) #None will mean it's a "service" like task
gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) ) gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) )
self_aware = models.BooleanField( default = False, help_text = "Whether It's first argument is 'self', being a trace instance." )
#Periodic Tasks #Periodic Tasks
interval = models.DurationField( null = True, blank = True ) interval = models.DurationField( null = True, blank = True )
@ -164,7 +165,6 @@ class Trace( BaseModel ):
async def start( self ): async def start( self ):
#assert self.status == "W", f"Cannot start a task that is not Waiting ({self.get_status_display()})."
await self.eval_related('task') await self.eval_related('task')
assert self.status in "SPAWE", f"Cannot start a task that is in {self.get_status_display()} state!" assert self.status in "SPAWE", f"Cannot start a task that is in {self.get_status_display()} state!"
@ -184,13 +184,23 @@ class Trace( BaseModel ):
finally: finally:
await self.asave() await self.asave()
if self.task.self_aware:
func = functools.partial(func, self)
#Create an io object to read the print output #Create an io object to read the print output
new_lines = asyncio.Event() #TODO: So we can update the db mid task in an async way new_print = asyncio.Event() #TODO: So we can update the db mid task in an async way
def confined_print( *args, sep = " ", end = "\n", **kwargs ): def confined_print( *args, sep = " ", end = "\n", **kwargs ):
self.stdout += sep.join( str(i) for i in args ) + end self.stdout += sep.join( str(i) for i in args ) + end
new_lines.set() new_print.set()
async def confined_print_flush():
while True:
await new_print.wait()
await self.asave( update_fields = ['stdout'] )
new_print.clear()
confined_print_flush_task = asyncio.create_task( confined_print_flush() )
try: try:
with patch( 'builtins.print', confined_print ): with patch( 'builtins.print', confined_print ):
@ -205,18 +215,11 @@ class Trace( BaseModel ):
self.returned = output self.returned = output
finally: finally:
confined_print_flush_task.cancel()
self.last_end_datetime = timezone.now() self.last_end_datetime = timezone.now()
await self.asave() await self.asave()
#TODO: Cool stuff to add later
#callee = models.TextField()
#caller = models.TextField()
#repeatable = models.BooleanField( default = True ) #Unless something in args or kwargs is unserializable!
#tags = models.JSONField( default = list )
#
#https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/ #https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/

View File

@ -46,6 +46,12 @@ def task( *args, **kwargs ):
return Task( *args, **kwargs ).register return Task( *args, **kwargs ).register
def service( *args, **kwargs ):
kwargs.setdefault( 'timeout', None )
kwargs.setdefault( 'worker_type', "R" )
kwargs.setdefault( 'self_aware', True )
return task( *args, **kwargs )
def run_on_model_change( *models ): def run_on_model_change( *models ):
models = [ models = [

View File

@ -272,6 +272,7 @@ class AsyncronWorker:
await asyncio.sleep( self.check_interval ) await asyncio.sleep( self.check_interval )
self.check_interval = 10 self.check_interval = 10
try: try:
await self.check_services()
await self.check_scheduled() await self.check_scheduled()
await sync_to_async( close_old_connections )() await sync_to_async( close_old_connections )()
except OperationalError as e: except OperationalError as e:
@ -321,8 +322,10 @@ class AsyncronWorker:
async def start_trace_on_time( self, trace ): async def start_trace_on_time( self, trace ):
from .models import Trace from .models import Trace
if trace.task.timeout: #If this is from a periodic task
await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() ) await asyncio.sleep( ( timezone.now() - trace.scheduled_datetime ).total_seconds() )
await trace.arefresh_from_db() await trace.arefresh_from_db()
await trace.start() await trace.start()
trace.worker_lock = None trace.worker_lock = None
await trace.asave( update_fields = ['worker_lock'] ) await trace.asave( update_fields = ['worker_lock'] )
@ -337,3 +340,29 @@ class AsyncronWorker:
await QuerySet.exclude( await QuerySet.exclude(
id__in = QuerySet[:max_count].values_list( 'id', flat = True ) id__in = QuerySet[:max_count].values_list( 'id', flat = True )
).adelete() ).adelete()
async def check_services( self ):
from .models import Task, Trace
#start services that aren't running yet.
Ts = Task.objects.filter( interval = None ).exclude(
trace__status__in = "WR"
).exclude( worker_type = "D" if self.model.is_robust else "R" )
async for task in Ts:
locked = await Task.objects.filter( id = task.id ).filter(
models.Q(worker_lock = None) |
models.Q(worker_lock = self.model) #This is incase the lock has been aquired for some reason before.
).aupdate( worker_lock = self.model )
if not locked: continue
trace = task.new_trace()
trace.status = "W"
trace.worker_lock = self.model
await trace.asave()
#self.running_service_tasks[task.id] =
self.loop.create_task( self.start_trace_on_time( trace ) )
await Task.objects.filter( id = task.id, worker_lock = self.model ).aupdate( worker_lock = None )