263 lines
8.9 KiB
Python
263 lines
8.9 KiB
Python
from django.utils import timezone
|
|
from django.db import models
|
|
from django.db.models.constraints import UniqueConstraint, Q
|
|
|
|
#To mock print, can't use redirect_stdout in async code
|
|
#This is buggy, has print leakage and still not very good,
|
|
#But better than nothing when a task is not self_aware or calls something that isn't
|
|
from unittest.mock import patch
|
|
|
|
|
|
import functools, traceback, io
|
|
import random
|
|
import asyncio
|
|
|
|
|
|
# Create your models here.
|
|
from .base.models import BaseModel
|
|
class Worker( BaseModel ):
|
|
pid = models.IntegerField()
|
|
thread_id = models.PositiveBigIntegerField()
|
|
is_robust = models.BooleanField( default = False )
|
|
is_master = models.BooleanField( default = False )
|
|
in_grace = models.BooleanField( default = False ) #If the worker sees this as True, it should kill itself!
|
|
|
|
#Variables with very feel good names! :)
|
|
last_crowning_attempt = models.DateTimeField( null = True, blank = True )
|
|
consumption_interval_seconds = models.IntegerField( default = 10 )
|
|
consumption_total_active = models.IntegerField( default = 0 )
|
|
def __str__( self ): return f"P{self.pid}W{self.thread_id}" + ("R" if self.is_robust else "D")
|
|
|
|
class Meta:
|
|
constraints = [
|
|
UniqueConstraint( fields = ('is_master',), condition = Q( is_master = True ), name='only_one_master'),
|
|
]
|
|
|
|
def is_proc_alive( self ):
|
|
import os
|
|
pid = self.pid #Slightly Altered: https://stackoverflow.com/a/20186516
|
|
if pid < 0: return False #NOTE: pid == 0 returns True
|
|
try: os.kill(pid, 0)
|
|
except ProcessLookupError: return False # errno.ESRCH: No such process
|
|
except PermissionError: return True # errno.EPERM: Operation not permitted (i.e., process exists)
|
|
else: return True # no error, we can send a signal to the process
|
|
|
|
class Task( BaseModel ):
|
|
registered_tasks = {} #Name -> self
|
|
name = models.TextField( unique = True ) #Path to the function
|
|
worker_lock = models.ForeignKey( Worker, null = True, blank = True, on_delete = models.SET_NULL )
|
|
worker_type = models.CharField( default = "A", choices = {
|
|
"A": "Any",
|
|
"R": "Robust", #Only seperate Robust workers
|
|
"D": "Dynamic", #Only on potentially reloadable workers
|
|
})
|
|
|
|
max_completed_traces = models.IntegerField( default = 10 )
|
|
max_failed_traces = models.IntegerField( default = 1000 )
|
|
|
|
timeout = models.DurationField(
|
|
default = timezone.timedelta( minutes = 5 ),
|
|
null = True, blank = True
|
|
) #None will mean it's a "service" like task
|
|
gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) )
|
|
self_aware = models.BooleanField( default = False, help_text = "Whether It's first argument is 'self', being a trace instance." )
|
|
|
|
#Periodic Tasks
|
|
interval = models.DurationField( null = True, blank = True )
|
|
jitter_length = models.DurationField( default = timezone.timedelta( seconds = 0 ), blank = True )
|
|
jitter_pivot = models.CharField( default = "M", max_length = 1, choices = {
|
|
"S":"Start", "M":"Middle", "E":"End",
|
|
})
|
|
def get_jitter( self ):
|
|
jitter = self.jitter_length * random.random()
|
|
match self.jitter_pivot:
|
|
case "M":
|
|
jitter -= self.jitter_length / 2
|
|
case "E":
|
|
jitter *= -1
|
|
return jitter
|
|
|
|
|
|
def __str__( self ):
|
|
type = "Callable" if self.interval is None else "Periodic"
|
|
mode = "Service" if self.timeout is None else "Task"
|
|
short = self.name.rsplit('.')[-1]
|
|
return " ".join([type, mode, short])
|
|
|
|
def register( self, f ):
|
|
if not self.name: self.name = f"{f.__module__}.{f.__qualname__}"
|
|
self.registered_tasks[self.name] = f
|
|
f.task = self
|
|
return f
|
|
|
|
def new_trace( self ):
|
|
trace = Trace( task_id = self.id )
|
|
trace.task = self #Less db hits
|
|
return trace
|
|
|
|
|
|
async def ensure_quick_execution( self, reason = "Quick Exec" ):
|
|
now = timezone.now()
|
|
if await self.trace_set.filter( status = "W" ).aexists():
|
|
return
|
|
|
|
if await self.trace_set.filter( status = "S", scheduled_datetime__lte = now ).aexists():
|
|
return
|
|
|
|
trace = await self.trace_set.filter( status = "S" ).order_by('scheduled_datetime').afirst()
|
|
if not trace: trace = self.new_trace()
|
|
await trace.reschedule( reason = reason, target_datetime = now )
|
|
await trace.asave()
|
|
|
|
class Trace( BaseModel ):
|
|
task = models.ForeignKey( Task, on_delete = models.CASCADE )
|
|
|
|
status_reason = models.TextField( default = "", blank = True )
|
|
status = models.CharField( default = "S", max_length = 1, choices = {
|
|
"S":"Scheduled",
|
|
"W":"Waiting",
|
|
"R":"Running",
|
|
"P":"Paused",
|
|
"C":"Completed",
|
|
"A":"Aborted",
|
|
"E":"Error",
|
|
})
|
|
def set_status( self, status, reason = "" ):
|
|
self.status = status
|
|
self.status_reason = reason
|
|
|
|
scheduled_datetime = models.DateTimeField( null = True, blank = True )
|
|
register_datetime = models.DateTimeField( auto_now_add = True )
|
|
last_run_datetime = models.DateTimeField( null = True, blank = True )
|
|
last_end_datetime = models.DateTimeField( null = True, blank = True )
|
|
|
|
worker_lock = models.ForeignKey( Worker, null = True, blank = True, on_delete = models.SET_NULL )
|
|
protected = models.BooleanField( default = False ) #Do not delete these.
|
|
args = models.JSONField( default = list, blank = True )
|
|
kwargs = models.JSONField( default = dict, blank = True )
|
|
stdout = models.TextField( null = True, blank = True )
|
|
stderr = models.TextField( null = True, blank = True )
|
|
returned = models.JSONField( null = True, blank = True )
|
|
|
|
def __str__( self ): return f"Trace of Task {self.task}"
|
|
|
|
class Meta:
|
|
constraints = [
|
|
UniqueConstraint(
|
|
fields = ['task_id'],
|
|
condition = models.Q(status = "S", scheduled_datetime = None),
|
|
name = "unique_unscheduled_for_task",
|
|
)
|
|
]
|
|
|
|
|
|
async def reschedule( self, reason = "", target_datetime = None ):
|
|
assert self.status in "SAE", f"Cannot reschedule a task that is in {self.get_status_display()} state!"
|
|
|
|
await self.eval_related('task')
|
|
assert self.task.interval or target_datetime, "This is not a periodic task! Nothing to reschedule."
|
|
|
|
self.set_status( "S", reason )
|
|
if target_datetime:
|
|
self.scheduled_datetime = target_datetime
|
|
|
|
else:
|
|
base_time = self.last_run_datetime or timezone.now()
|
|
jitter = self.task.get_jitter()
|
|
self.scheduled_datetime = base_time + self.task.interval + jitter
|
|
|
|
if self.id: await self.asave( update_fields = ["status", "status_reason", "scheduled_datetime"] )
|
|
|
|
|
|
async def start( self ):
|
|
await self.eval_related('task')
|
|
assert self.status in "SPAWE", f"Cannot start a task that is in {self.get_status_display()} state!"
|
|
|
|
self.last_run_datetime = timezone.now()
|
|
self.last_end_datetime = None
|
|
self.returned = None
|
|
self.stderr = ""
|
|
self.stdout = ""
|
|
#Runtime Bits
|
|
self.loop = asyncio.get_running_loop()
|
|
self.new_print = asyncio.Event()
|
|
self.commit_on_new_print_task = self.loop.create_task( self.commit_on_new_print() )
|
|
|
|
try:
|
|
func = Task.registered_tasks[self.task.name]
|
|
except KeyError:
|
|
self.set_status( "E", "Script Missing!" )
|
|
await self.asave()
|
|
return
|
|
|
|
self.set_status( "R" )
|
|
await self.asave()
|
|
|
|
try:
|
|
async with asyncio.timeout( None ) as tmcm:
|
|
|
|
if self.task.timeout:
|
|
tmcm.reschedule( self.loop.time() + self.task.timeout.total_seconds() )
|
|
|
|
if self.task.self_aware:
|
|
output = await func(self, *self.args, **self.kwargs )
|
|
else:
|
|
with patch( 'builtins.print', self.print ):
|
|
output = await func( *self.args, **self.kwargs )
|
|
|
|
except TimeoutError:
|
|
self.set_status( "E", f"Timed out" )
|
|
self.stderr = traceback.format_exc()
|
|
|
|
except Exception as e:
|
|
self.set_status( "E", f"Exception: {e}" )
|
|
self.stderr = traceback.format_exc()
|
|
|
|
else:
|
|
self.set_status( "C" )
|
|
self.returned = output
|
|
|
|
self.commit_on_new_print_task.cancel()
|
|
self.last_end_datetime = timezone.now()
|
|
await self.asave()
|
|
|
|
|
|
### Runtime methods for self aware tasks
|
|
async def commit_on_new_print( self ):
|
|
while True:
|
|
await self.new_print.wait()
|
|
await self.asave( update_fields = ['stdout'] )
|
|
self.new_print.clear()
|
|
|
|
def print( self, *args, sep = " ", end = "\n", file = None, flush = True ): #We get 'file' here to fool tasks that aren't self_aware
|
|
assert hasattr(self, 'commit_on_new_print_task'), "trace.print needs to be called while a trace.commit_on_new_print task is active!"
|
|
self.stdout += sep.join( str(i) for i in args ) + end
|
|
if flush: self.new_print.set()
|
|
|
|
|
|
#https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/
|
|
from django.contrib.contenttypes.fields import GenericForeignKey
|
|
from django.contrib.contenttypes.models import ContentType
|
|
class Metadata( BaseModel ):
|
|
model_type = models.ForeignKey( ContentType, on_delete = models.CASCADE )
|
|
model_id = models.PositiveIntegerField()
|
|
model = GenericForeignKey("model_type", "model_id")
|
|
|
|
name = models.CharField( max_length = 256 )
|
|
data = models.JSONField( null = True, blank = True )
|
|
expiration_datetime = models.DateTimeField( null = True, blank = True )
|
|
|
|
@property
|
|
def is_expired( self ):
|
|
if self.expiration_datetime: return self.expiration_datetime < timezone.now()
|
|
return False
|
|
|
|
|
|
def __str__(self): return self.name
|
|
|
|
class Meta:
|
|
indexes = [
|
|
models.Index(fields=["model_type", "model_id"]),
|
|
]
|
|
verbose_name = verbose_name_plural = 'Metadata'
|