diff --git a/asyncron/__init__.py b/asyncron/__init__.py new file mode 100644 index 0000000..9b172ea --- /dev/null +++ b/asyncron/__init__.py @@ -0,0 +1,3 @@ +from .shortcuts import task, run_on_model_change + +__all__ = ['task', 'run_on_model_change'] diff --git a/asyncron/admin.py b/asyncron/admin.py new file mode 100644 index 0000000..12112e8 --- /dev/null +++ b/asyncron/admin.py @@ -0,0 +1,198 @@ +from django.contrib import admin +from django.utils import timezone +from django.db.models import F + +from .base.admin import BaseModelAdmin +from .models import Worker, Task, Trace, Metadata + +import os +import asyncio +import humanize + +@admin.register( Metadata ) +class MetadataAdmin( BaseModelAdmin ): + order = 3 + list_display = 'name', 'model_type', 'target', 'expiration' + + def target( self, obj ): return str(obj.model) + + def expiration( self, obj ): + if obj.expiration_datetime: return humanize.naturaltime( obj.expiration_datetime ) + return "Never" + expiration.admin_order_field = 'expiration_datetime' + + def has_add_permission( self, request, obj = None ): return False + def has_change_permission( self, request, obj = None ): return False + +@admin.register( Worker ) +class WorkerAdmin( BaseModelAdmin ): + order = 4 + list_display = 'pid', 'thread_id', 'is_robust', 'is_master', 'is_running', 'health', + def has_add_permission( self, request, obj = None ): return False + + def is_running( self, obj ): return obj.is_proc_alive() + is_running.boolean = True + + def health( self, obj ): + return (f"In Grace " if obj.in_grace else "") + humanize.naturaltime( obj.last_crowning_attempt ) + +@admin.register( Task ) +class TaskAdmin( BaseModelAdmin ): + order = 1 + list_display = 'name', 'timeout', 'gracetime', 'jitter', 'type', 'worker_type', 'logged_executions', 'last_execution', 'scheduled' + fields = ["name", "description", "type", "jitter"] + + actions = 'schedule_execution', 'execution_now', + def has_add_permission( self, request, obj = None ): return False + def has_delete_permission( self, request, obj = None ): return False + def has_change_permission( self, request, obj = None ): return False + + def description( self, obj ): + try: + return obj.registered_tasks[obj.name].__doc__.strip("\n") + except: + return "N/A" + + def jitter( self, obj ): + match obj.jitter_pivot: + case "S": sign = '+' + case "M": sign = '±' + case "E": sign = '-' + return f"{sign}{obj.jitter_length}" + + def type( self, obj ): + results = [] + + if obj.timeout is None: + results.append( "Service" ) + + if obj.interval: + delta = humanize.naturaldelta( obj.interval ) + delta = delta.replace("an ", "1 ").replace("a ", "1 ") + results.append( f"Periodic, every {delta}" ) + + if obj.name not in obj.registered_tasks: + results.append("Script Missing!") + + return ", ".join( results ) if results else "Callable" + + def logged_executions( self, obj ): + return obj.trace_set.exclude( status = "S" ).count() + + def last_execution( self, obj ): + last_trace = obj.trace_set.exclude( status = "S" ).exclude( last_run_datetime = None ).order_by('last_run_datetime').last() + return humanize.naturaltime( last_trace.last_run_datetime ) if last_trace else "Never" + + def scheduled( self, obj ): + return obj.trace_set.filter( status = "S" ).exists() + scheduled.boolean = True + + @admin.action( description = "(Re)Schedule an execution for periodic tasks" ) + def schedule_execution( self, request, qs ): + trace_ids = set() + for task in qs.exclude( interval = None ): + trace = task.new_trace() + trace.save() + trace.refresh_from_db() + trace_ids.add(trace.id) + + results = asyncio.run( Trace.objects.filter( id__in = trace_ids ).gather_method( 'reschedule', reason = "Manually Schedule" ) ) + self.explain_gather_results( request, results, 5 ) + + @admin.action( description = "Execute now!" ) + def execution_now( self, request, qs ): + trace_ids = set() + for task in qs: + trace = task.new_trace() + trace.status = "W" + trace.save() + trace.refresh_from_db() + trace_ids.add(trace.id) + + results = asyncio.run( Trace.objects.filter( id__in = trace_ids ).gather_method( 'start' ) ) + self.explain_gather_results( request, results, 5 ) + + +class TraceAppFilter(admin.SimpleListFilter): + title = "app" + parameter_name = 'app_groups' + + def lookups( self, request, model_admin ): + return ( + ( a.lower(), a ) + for a in sorted({ + task.name.split(".", 1)[0] + for task in Task.objects.all() + }) if a + ) + + def queryset( self, request, queryset ): + q = self.value() + if not q: return queryset + return queryset.filter( task__name__istartswith = q ) + +class TraceNameFilter(admin.SimpleListFilter): + title = "task name" + parameter_name = 'short_name' + + def lookups( self, request, model_admin ): + return ( + ( a.lower(), a ) + for a in sorted({ + task.name.rsplit(".", 1)[-1] + for task in Task.objects.all() + }) if a + ) + + def queryset( self, request, queryset ): + q = self.value() + if not q: return queryset + return queryset.filter( task__name__iendswith = q ) + + +@admin.register( Trace ) +class TraceAdmin( BaseModelAdmin ): + order = 2 + list_display = 'task', 'execution', 'state', 'worker_lock' + list_filter = TraceAppFilter, TraceNameFilter, 'task__worker_type', 'status', 'status_reason', + ordering = F('scheduled_datetime').desc(nulls_last=True), + #readonly_fields = [ f.name for f in Trace._meta.fields ] + + def has_add_permission( self, request, obj = None ): return False + + def execution( self, obj ): + if obj.last_run_datetime: + return "- Ran " + humanize.naturaltime( obj.last_run_datetime ) + + if obj.scheduled_datetime: + if obj.scheduled_datetime < timezone.now(): + return "- Should've run " + humanize.naturaltime( obj.scheduled_datetime ) + else: + return "+ In " + humanize.naturaltime( obj.scheduled_datetime ) + + return "Never" + execution.admin_order_field = 'scheduled_datetime' + + def state( self, obj ): + return f"{obj.status}: {obj.status_reason}" if obj.status_reason else f"{obj.get_status_display()}" + state.admin_order_field = 'status' + + + actions = 'reschedule_to_now', + @admin.action( description = "Reschedule to run now" ) + def reschedule_to_now( self, request, qs ): + results = asyncio.run( + qs.exclude( task__interval = None ).filter( status = "S" ).gather_method( + 'reschedule', + reason = "Manually Rescheduled", + target_datetime = timezone.now(), + ) + ) + self.explain_gather_results( request, results, 5 ) + + + + + + +# diff --git a/asyncron/apps.py b/asyncron/apps.py new file mode 100644 index 0000000..86ea225 --- /dev/null +++ b/asyncron/apps.py @@ -0,0 +1,34 @@ +from django.apps import AppConfig +from django.conf import settings +from django.apps import apps + +import pathlib, importlib, types + + +class AsyncronConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'asyncron' + + def ready( self ): + try: names = settings.ASYNCRON['IMPORT_PER_APP'] + except (KeyError, AttributeError): pass + else: self.import_per_app( names ) + + #Init the asyncron worker for this process + from .workers import AsyncronWorker + #The worker should not start working until they know we're responding to requests. + AsyncronWorker.init() + + def import_per_app( self, names ): + for app in apps.get_app_configs(): + + app_dir = pathlib.Path(app.path) + if app_dir.parent != settings.BASE_DIR: continue + + for name in names: + import_file = app_dir / f"{name}.py" + if not import_file.exists() or not import_file.is_file(): continue + + #print( f"Loading {app.name}.{name}:", import_file ) + loader = importlib.machinery.SourceFileLoader( f"{app.name}.{name}", str(import_file) ) + loader.exec_module( types.ModuleType(loader.name) ) diff --git a/asyncron/base/admin.py b/asyncron/base/admin.py new file mode 100644 index 0000000..58e7098 --- /dev/null +++ b/asyncron/base/admin.py @@ -0,0 +1,68 @@ +from django.contrib import admin, messages +from django.utils.safestring import mark_safe +from django.utils.html import escape +import traceback + +class BaseModelAdmin( admin.ModelAdmin ): + + def explain_gather_results( self, request, results, fails_to_show = 2 ): + failed = 0 + for id, e in results.items(): + if isinstance(e, BaseException): + failed += 1 + if failed <= fails_to_show: + if request.user.is_superuser: + traceback_message = ''.join(traceback.TracebackException.from_exception(e).format()) + self.message_user( request, mark_safe(f""" + Error For {id}: {e} + + [TraceBack] +
{escape(traceback_message)}
+ """), messages.ERROR
+ )
+ else: self.message_user( request, f"Error For {id}: {e}", messages.ERROR)
+
+
+ if failed == 0: self.message_user( request, f"All {len(results)} Succeeded!", messages.SUCCESS )
+ elif failed <= fails_to_show:
+ if len(results) - failed > 0:
+ self.message_user( request, f"All the rest ({len(results) - failed}) Succeeded!", messages.SUCCESS )
+ else: self.message_user( request, f"{len(results) - failed} Succeeded, {failed - fails_to_show} more failed!", messages.WARNING )
+
+
+import math
+from django.contrib import admin
+from django.apps import apps
+def get_app_list(self, request, app_label=None):
+ """
+ Return a sorted list of all the installed apps that have been
+ registered in this site.
+ """
+ app_dict = self._build_app_dict(request, app_label)
+ app_ordering = { app.name: index for index, app in enumerate( apps.get_app_configs() ) }
+
+ # Sort the apps by settings order, then alphabetically.
+ app_list = sorted(
+ app_dict.values(),
+ key = lambda x:
+ (
+ app_ordering.get( x["name"], math.inf ),
+ x["name"].lower()
+ )
+ )
+
+ # Sort the models admin.order/alphabetically within each app.
+ for app in app_list:
+ app["models"].sort(
+ key=lambda x:
+ (
+ getattr( admin.site.get_model_admin( x['model'] ), 'order', math.inf ),
+ x['name'].lower()
+ )
+ )
+
+ return app_list
+admin.AdminSite.get_app_list = get_app_list
diff --git a/asyncron/base/models.py b/asyncron/base/models.py
new file mode 100644
index 0000000..c33bb12
--- /dev/null
+++ b/asyncron/base/models.py
@@ -0,0 +1,56 @@
+from django.db import models
+from django.contrib.contenttypes.fields import GenericRelation
+from asgiref.sync import sync_to_async
+
+from asyncron.utils import rgetattr
+
+import asyncio
+
+class AsyncronQuerySet( models.QuerySet ):
+ async def gather_method( self, method, *args, **kwargs ):
+ mapping = {
+ instance.pk: getattr( instance, method )( *args, **kwargs )
+ async for instance in self
+ }
+ returns = await asyncio.gather( *list(mapping.values()), return_exceptions = True )
+ for index, pk in enumerate(mapping):
+ mapping[pk] = returns[index]
+
+ return mapping
+
+ def to_json( self, *structure ):
+ return [ m.fields_to_dict( *structure ) for m in self ]
+
+
+
+class BaseModel( models.Model ):
+ objects = AsyncronQuerySet.as_manager()
+ metadata = GenericRelation("asyncron.Metadata", content_type_field = 'model_type', object_id_field = 'model_id')
+
+ async def eval_related( self, *fields ):
+ if not fields:
+ fields = [ f.name for f in self._meta.fields if f.is_relation ]
+
+ #Since we're using an underscore variable
+ #This next line running correctly is optional,
+ #but helps reduce or eliminate 'sync_to_async' context switches.
+ try: fields = [ f for f in fields if f not in self._state.fields_cache ]
+ except: print("WARNING: could not check already cached relations.")
+
+ if fields:
+ await sync_to_async(lambda: [ getattr(self, f) for f in fields ])()
+
+
+ def fields_to_dict( self, *fields ):
+ """
+ To create json/dict from fields.
+ """
+ results = {}
+ for f in fields:
+ name, method = (f[0], f[1]) if isinstance(f, tuple) else (f, f)
+ value = method(self) if callable(method) else rgetattr(self, method)
+ results[name] = value() if callable(value) else value
+ return results
+
+ class Meta:
+ abstract = True
diff --git a/asyncron/gunicorn.py b/asyncron/gunicorn.py
new file mode 100644
index 0000000..3a75450
--- /dev/null
+++ b/asyncron/gunicorn.py
@@ -0,0 +1,38 @@
+##
+## - Gunicorn compatibility
+## Add this to gunicorn.py conf file:
+## from asyncron.gunicorn import post_fork
+##
+## adds an asyncron worker in each gunicorn worker process
+## Hooks into 'dev reload' and 'exist signals' for graceful termination of tasks
+##
+
+def post_fork( server, worker ): #worker and AsyncronWorker, pay attention!
+ post_fork.server = server
+ post_fork.worker = worker
+
+ from .workers import AsyncronWorker
+ AsyncronWorker.log = worker.log
+ AsyncronWorker.log.info("Asyncron worker attached.")
+
+ init_to_override = AsyncronWorker.init
+ def init( *args, **kwargs ):
+ AsyncronWorker.MAX_COUNT = 1
+ AsyncronWorker.override_exit_signals()
+
+ to_override = worker.reloader._callback
+ def new_callback(*args, **kwargs):
+ AsyncronWorker.stop( reason = "Auto Reload" )
+ return to_override(*args, **kwargs)
+ worker.reloader._callback = new_callback
+
+ return init_to_override( *args, **kwargs )
+ AsyncronWorker.init = init
+
+
+# Keeping the worker in post_fork.worker so we can add extra files it for it to track
+# TODO: Currently unfinished, since i just realized using the "inotify" support of gunicorn
+# makes this reduntant, but still here is the relevant code if I want to also support the simpler
+# polling system
+# Should be in asyncron.app.ready
+# -> post_fork.worker.reloader.add_extra_file
diff --git a/asyncron/management/commands/run_asyncron_worker.py b/asyncron/management/commands/run_asyncron_worker.py
new file mode 100644
index 0000000..166fb74
--- /dev/null
+++ b/asyncron/management/commands/run_asyncron_worker.py
@@ -0,0 +1,88 @@
+##
+#
+# Command: python manage.py startasyncron
+#
+##
+
+import logging
+import asyncio
+import time
+
+from django.core.management.base import BaseCommand, CommandError
+from django.conf import settings
+
+from asyncron.workers import AsyncronWorker
+from asyncron.models import Task
+
+class bcolors:
+ HEADER = '\033[95m'
+ OKBLUE = '\033[94m'
+ OKCYAN = '\033[96m'
+ OKGREEN = '\033[92m'
+ WARNING = '\033[93m'
+ FAIL = '\033[91m'
+ ENDC = '\033[0m'
+ BOLD = '\033[1m'
+ UNDERLINE = '\033[4m'
+
+class Command(BaseCommand):
+ help = 'Start an Asyncorn Worker'
+
+ def handle( self, *arg, **kwargs ):
+ AsyncronWorker.log = logging.getLogger(__name__)
+
+ worker = AsyncronWorker( daemon = False )
+ print( "Starting:", worker )
+ worker.start( is_robust = True )
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ #Older Stuff
+ def maintain_tasks( self ):
+ for name, func in Task.registered_tasks.items():
+ try: task = Task.objects.get( name = name )
+ except: task = func.task.save()
+
+ def handle_mgr( self, *arg, **kwargs ):
+ from multiprocessing.connection import Listener
+ import multiprocessing
+
+ from asyncron.manager import PoolManager
+ PoolManager.init_manager()
+ print( "Coordinator:", PoolManager.coordinator )
+
+ address = PoolManager.coordinator.split("unix:", 1)[-1]
+ with Listener( address, authkey = settings.SECRET_KEY.encode() ) as listener:
+
+ while True:
+
+ try:
+ with listener.accept() as conn:
+ print( "New Conn:", conn )
+ while msg := conn.recv():
+ func, args, kwargs, name, repeat_interval, timeout_after, execution_context, execution_pool = msg
+ print( "Msg:", msg )
+
+ asyncio.run( func( *args, **kwargs ) )
+ print("Ran.")
+
+ except EOFError:
+ print("Connection Closed.")
+ continue
+ except KeyboardInterrupt:
+ print("Stopping...")
+ break
+
+
+#
diff --git a/asyncron/migrations/__init__.py b/asyncron/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/asyncron/models.py b/asyncron/models.py
new file mode 100644
index 0000000..bb0d74f
--- /dev/null
+++ b/asyncron/models.py
@@ -0,0 +1,246 @@
+from django.utils import timezone
+from django.db import models
+from django.db.models.constraints import UniqueConstraint, Q
+from unittest.mock import patch #to mock print, can't use redirect_stdout in async code
+
+import functools, traceback, io
+import random
+import asyncio
+
+
+# Create your models here.
+from .base.models import BaseModel
+class Worker( BaseModel ):
+ pid = models.IntegerField()
+ thread_id = models.PositiveBigIntegerField()
+ is_robust = models.BooleanField( default = False )
+ is_master = models.BooleanField( default = False )
+ in_grace = models.BooleanField( default = False ) #If the worker sees this as True, it should kill itself!
+
+ #Variables with very feel good names! :)
+ last_crowning_attempt = models.DateTimeField( null = True, blank = True )
+ consumption_interval_seconds = models.IntegerField( default = 10 )
+ consumption_total_active = models.IntegerField( default = 0 )
+ def __str__( self ): return f"P{self.pid}W{self.thread_id}" + ("R" if self.is_robust else "D")
+
+ class Meta:
+ constraints = [
+ UniqueConstraint( fields = ('is_master',), condition = Q( is_master = True ), name='only_one_master'),
+ ]
+
+ def is_proc_alive( self ):
+ import os
+ pid = self.pid #Slightly Altered: https://stackoverflow.com/a/20186516
+ if pid < 0: return False #NOTE: pid == 0 returns True
+ try: os.kill(pid, 0)
+ except ProcessLookupError: return False # errno.ESRCH: No such process
+ except PermissionError: return True # errno.EPERM: Operation not permitted (i.e., process exists)
+ else: return True # no error, we can send a signal to the process
+
+class Task( BaseModel ):
+ registered_tasks = {} #Name -> self
+ name = models.TextField( unique = True ) #Path to the function
+ worker_lock = models.ForeignKey( Worker, null = True, blank = True, on_delete = models.SET_NULL )
+ worker_type = models.CharField( default = "A", choices = {
+ "A": "Any",
+ "R": "Robust", #Only seperate Robust workers
+ "D": "Dynamic", #Only on potentially reloadable workers
+ })
+
+ max_completed_traces = models.IntegerField( default = 10 )
+ max_failed_traces = models.IntegerField( default = 1000 )
+
+ timeout = models.DurationField(
+ default = timezone.timedelta( minutes = 5 ),
+ null = True, blank = True
+ ) #None will mean it's a "service" like task
+ gracetime = models.DurationField( default = timezone.timedelta( minutes = 1 ) )
+
+ #Periodic Tasks
+ interval = models.DurationField( null = True, blank = True )
+ jitter_length = models.DurationField( default = timezone.timedelta( seconds = 0 ), blank = True )
+ jitter_pivot = models.CharField( default = "M", max_length = 1, choices = {
+ "S":"Start", "M":"Middle", "E":"End",
+ })
+ def get_jitter( self ):
+ jitter = self.jitter_length * random.random()
+ match self.jitter_pivot:
+ case "M":
+ jitter -= self.jitter_length / 2
+ case "E":
+ jitter *= -1
+ return jitter
+
+
+ def __str__( self ):
+ type = "Callable" if self.interval is None else "Periodic"
+ mode = "Service" if self.timeout is None else "Task"
+ short = self.name.rsplit('.')[-1]
+ return " ".join([type, mode, short])
+
+ def register( self, f ):
+ if not self.name: self.name = f"{f.__module__}.{f.__qualname__}"
+ self.registered_tasks[self.name] = f
+ f.task = self
+ return f
+
+ def new_trace( self ):
+ trace = Trace( task_id = self.id )
+ trace.task = self #Less db hits
+ return trace
+
+
+ async def ensure_quick_execution( self, reason = "Quick Exec" ):
+ now = timezone.now()
+ if await self.trace_set.filter( status = "W" ).aexists():
+ return
+
+ if await self.trace_set.filter( status = "S", scheduled_datetime__lte = now ).aexists():
+ return
+
+ trace = await self.trace_set.filter( status = "S" ).order_by('scheduled_datetime').afirst()
+ if not trace: trace = self.new_trace()
+ await trace.reschedule( reason = reason, target_datetime = now )
+ await trace.asave()
+
+class Trace( BaseModel ):
+ task = models.ForeignKey( Task, on_delete = models.CASCADE )
+
+ status_reason = models.TextField( default = "", blank = True )
+ status = models.CharField( default = "S", max_length = 1, choices = {
+ "S":"Scheduled",
+ "W":"Waiting",
+ "R":"Running",
+ "P":"Paused",
+ "C":"Completed",
+ "A":"Aborted",
+ "E":"Error",
+ })
+ def set_status( self, status, reason = "" ):
+ self.status = status
+ self.status_reason = reason
+
+ scheduled_datetime = models.DateTimeField( null = True, blank = True )
+ register_datetime = models.DateTimeField( auto_now_add = True )
+ last_run_datetime = models.DateTimeField( null = True, blank = True )
+ last_end_datetime = models.DateTimeField( null = True, blank = True )
+
+ worker_lock = models.ForeignKey( Worker, null = True, blank = True, on_delete = models.SET_NULL )
+ protected = models.BooleanField( default = False ) #Do not delete these.
+ args = models.JSONField( default = list, blank = True )
+ kwargs = models.JSONField( default = dict, blank = True )
+ stdout = models.TextField( null = True, blank = True )
+ stderr = models.TextField( null = True, blank = True )
+ returned = models.JSONField( null = True, blank = True )
+
+ def __str__( self ): return f"Trace of Task {self.task}"
+
+ class Meta:
+ constraints = [
+ UniqueConstraint(
+ fields = ['task_id'],
+ condition = models.Q(status = "S", scheduled_datetime = None),
+ name = "unique_unscheduled_for_task",
+ )
+ ]
+
+
+ async def reschedule( self, reason = "", target_datetime = None ):
+ assert self.status in "SAE", f"Cannot reschedule a task that is in {self.get_status_display()} state!"
+
+ await self.eval_related('task')
+ assert self.task.interval, "This is not a periodic task! Nothing to reschedule."
+
+ self.set_status( "S", reason )
+ if target_datetime:
+ self.scheduled_datetime = target_datetime
+
+ else:
+ base_time = self.last_run_datetime or timezone.now()
+ jitter = self.task.get_jitter()
+ self.scheduled_datetime = base_time + self.task.interval + jitter
+
+ if self.id: await self.asave( update_fields = ["status", "status_reason", "scheduled_datetime"] )
+
+
+ async def start( self ):
+ #assert self.status == "W", f"Cannot start a task that is not Waiting ({self.get_status_display()})."
+ await self.eval_related('task')
+ assert self.status in "SPAWE", f"Cannot start a task that is in {self.get_status_display()} state!"
+
+ self.last_run_datetime = timezone.now()
+ self.last_end_datetime = None
+ self.returned = None
+ self.stderr = ""
+ self.stdout = ""
+
+ try:
+ func = Task.registered_tasks[self.task.name]
+ except KeyError:
+ self.set_status( "E", "Script Missing!" )
+ return
+ else:
+ self.set_status( "R" )
+ finally:
+ await self.asave()
+
+
+ #Create an io object to read the print output
+ new_lines = asyncio.Event() #TODO: So we can update the db mid task in an async way
+
+ def confined_print( *args, sep = " ", end = "\n", **kwargs ):
+ self.stdout += sep.join( str(i) for i in args ) + end
+ new_lines.set()
+
+ try:
+ with patch( 'builtins.print', confined_print ):
+ output = await func( *self.args, **self.kwargs )
+
+ except Exception as e:
+ self.set_status( "E", f"Exception: {e}" )
+ self.stderr = traceback.format_exc()
+
+ else:
+ self.set_status( "C" )
+ self.returned = output
+
+ finally:
+ self.last_end_datetime = timezone.now()
+ await self.asave()
+
+
+ #TODO: Cool stuff to add later
+ #callee = models.TextField()
+ #caller = models.TextField()
+ #repeatable = models.BooleanField( default = True ) #Unless something in args or kwargs is unserializable!
+ #tags = models.JSONField( default = list )
+ #
+
+
+
+
+#https://docs.djangoproject.com/en/5.1/ref/contrib/contenttypes/
+from django.contrib.contenttypes.fields import GenericForeignKey
+from django.contrib.contenttypes.models import ContentType
+class Metadata( BaseModel ):
+ model_type = models.ForeignKey( ContentType, on_delete = models.CASCADE )
+ model_id = models.PositiveIntegerField()
+ model = GenericForeignKey("model_type", "model_id")
+
+ name = models.CharField( max_length = 256 )
+ data = models.JSONField( null = True, blank = True )
+ expiration_datetime = models.DateTimeField( null = True, blank = True )
+
+ @property
+ def is_expired( self ):
+ if self.expiration_datetime: return self.expiration_datetime < timezone.now()
+ return False
+
+
+ def __str__(self): return self.name
+
+ class Meta:
+ indexes = [
+ models.Index(fields=["model_type", "model_id"]),
+ ]
+ verbose_name = verbose_name_plural = 'Metadata'
diff --git a/asyncron/shortcuts.py b/asyncron/shortcuts.py
new file mode 100644
index 0000000..cd0571a
--- /dev/null
+++ b/asyncron/shortcuts.py
@@ -0,0 +1,74 @@
+##
+## decorators / functions to make the task calls easier
+##
+from django.utils.dateparse import parse_duration
+from django.db import models
+from django.utils import timezone
+from django.apps import apps
+import re
+
+# Regular expression pattern with named groups for "1w2d5h30m10s500ms1000us" without spaces
+pattern = re.compile(
+ r'(\+|-)?'
+ r'(?:(?P