asyncron/asyncron/admin.py
Oracle f6ce4fc374 Minor improvements and version bump
Made traces uneditable in django admin (looks better now)
Extender gives better error message on lazy related fields in async context
made self_aware default True for tasks
2025-08-09 17:21:37 +02:00

214 lines
6.4 KiB
Python

from django.contrib import admin
from django.utils import timezone
from django.db.models import F
from .base.admin import BaseModelAdmin
from .models import Worker, Task, Trace, Metadata
import os
import asyncio
import humanize
@admin.register( Metadata )
class MetadataAdmin( BaseModelAdmin ):
order = 3
list_display = 'name', 'model_type', 'target', 'expiration'
def target( self, obj ): return str(obj.model)
def expiration( self, obj ):
if obj.expiration_datetime: return humanize.naturaltime( obj.expiration_datetime )
return "Never"
expiration.admin_order_field = 'expiration_datetime'
def has_add_permission( self, request, obj = None ): return False
def has_change_permission( self, request, obj = None ): return False
@admin.register( Worker )
class WorkerAdmin( BaseModelAdmin ):
order = 4
list_display = 'pid', 'thread_id', 'is_robust', 'is_master', 'is_running', 'health',
def has_add_permission( self, request, obj = None ): return False
def is_running( self, obj ): return obj.is_proc_alive()
is_running.boolean = True
def health( self, obj ):
return (f"In Grace " if obj.in_grace else "") + humanize.naturaltime( obj.last_crowning_attempt )
@admin.register( Task )
class TaskAdmin( BaseModelAdmin ):
order = 1
list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged', 'last_execution', 'scheduled'
fields = ["name", "description", "type", "on_model_change", "jitter", "self_aware"]
actions = 'schedule_execution', 'execution_now', 'delete_script_missing'
def has_add_permission( self, request, obj = None ): return False
def has_delete_permission( self, request, obj = None ): return False
def has_change_permission( self, request, obj = None ): return False
def description( self, obj ):
try:
return obj.registered_tasks[obj.name].__doc__.strip("\n")
except:
return "N/A"
def jitter( self, obj ):
match obj.jitter_pivot:
case "S": sign = '+'
case "M": sign = '±'
case "E": sign = '-'
return f"{sign}{obj.jitter_length}"
def type( self, obj ):
results = []
if obj.timeout is None:
results.append( "Service" )
if obj.interval:
delta = humanize.naturaldelta( obj.interval )
delta = delta.replace("an ", "1 ").replace("a ", "1 ")
results.append( f"Periodic, every {delta}" )
if obj.name not in obj.registered_tasks:
results.append("Script Missing!")
return ", ".join( results ) if results else "Callable"
def on_model_change( self, obj ):
try: return ", ".join( f"{m.__module__}.{m.__name__}" for m in obj.registered_tasks[obj.name].watching_models )
except: return "N/A"
def logged( self, obj ):
count = obj.trace_set.exclude( status = "S" ).count()
if count == 1: return "1 trace"
return f"{count} traces"
def last_execution( self, obj ):
last_trace = obj.trace_set.exclude( status = "S" ).exclude( last_run_datetime = None ).order_by('last_run_datetime').last()
if not last_trace: return "Never"
return "Ongoing..." if last_trace.status == "R" else humanize.naturaltime( last_trace.last_run_datetime )
def scheduled( self, obj ):
return obj.trace_set.filter( status = "S" ).exists()
scheduled.boolean = True
@admin.action( description = "(Re)Schedule an execution for periodic tasks" )
def schedule_execution( self, request, qs ):
trace_ids = set()
for task in qs.exclude( interval = None ):
trace = task.new_trace()
trace.save()
trace.refresh_from_db()
trace_ids.add(trace.id)
results = asyncio.run( Trace.objects.filter( id__in = trace_ids ).gather_method( 'reschedule', reason = "Manually Schedule" ) )
self.explain_gather_results( request, results, 5 )
@admin.action( description = "Execute now!" )
def execution_now( self, request, qs ):
trace_ids = set()
for task in qs:
trace = task.new_trace()
trace.status = "W"
trace.save()
trace.refresh_from_db()
trace_ids.add(trace.id)
results = asyncio.run( Trace.objects.filter( id__in = trace_ids ).gather_method( 'start' ) )
self.explain_gather_results( request, results, 5 )
@admin.action( description = "Delete tasks with missing scripts" )
def delete_script_missing( self, request, qs ):
for task in qs:
if task.name not in task.registered_tasks:
task.delete()
class TraceAppFilter(admin.SimpleListFilter):
title = "app"
parameter_name = 'app_groups'
def lookups( self, request, model_admin ):
return (
( a.lower(), a )
for a in sorted({
task.name.split(".", 1)[0]
for task in Task.objects.all()
}) if a
)
def queryset( self, request, queryset ):
q = self.value()
if not q: return queryset
return queryset.filter( task__name__istartswith = q )
class TraceNameFilter(admin.SimpleListFilter):
title = "task name"
parameter_name = 'short_name'
def lookups( self, request, model_admin ):
return (
( a.lower(), a )
for a in sorted({
task.name.rsplit(".", 1)[-1]
for task in Task.objects.all()
}) if a
)
def queryset( self, request, queryset ):
q = self.value()
if not q: return queryset
return queryset.filter( task__name__iendswith = q )
@admin.register( Trace )
class TraceAdmin( BaseModelAdmin ):
order = 2
list_display = 'task', 'execution', 'state', 'worker_lock'
list_filter = TraceAppFilter, TraceNameFilter, 'task__worker_type', 'status', 'status_reason',
ordering = F('scheduled_datetime').desc(nulls_last=True),
#readonly_fields = [ f.name for f in Trace._meta.fields ]
def has_add_permission( self, request, obj = None ): return False
def has_change_permission( self, request, obj = None ): return False
def execution( self, obj ):
if obj.last_run_datetime:
return "- Ran " + humanize.naturaltime( obj.last_run_datetime )
if obj.scheduled_datetime:
if obj.scheduled_datetime < timezone.now():
return "- Should've run " + humanize.naturaltime( obj.scheduled_datetime )
else:
return "+ In " + humanize.naturaltime( obj.scheduled_datetime )
return "Never"
execution.admin_order_field = 'scheduled_datetime'
def state( self, obj ):
return f"{obj.status}: {obj.status_reason}" if obj.status_reason else f"{obj.get_status_display()}"
state.admin_order_field = 'status'
actions = 'reschedule_to_now',
@admin.action( description = "Reschedule to run now" )
def reschedule_to_now( self, request, qs ):
results = asyncio.run(
qs.exclude( task__interval = None ).filter( status = "S" ).gather_method(
'reschedule',
reason = "Manually Rescheduled",
target_datetime = timezone.now(),
)
)
self.explain_gather_results( request, results, 5 )
#