asyncron/asyncron/base/models.py
2025-03-13 22:53:33 +03:30

77 lines
2.4 KiB
Python

from django.db import models
from django.contrib.contenttypes.fields import GenericRelation
from asgiref.sync import sync_to_async
from asyncron.utils import rgetattr
import asyncio
class AsyncronQuerySet( models.QuerySet ):
async def gather_method( self, method, *args, **kwargs ):
#This functions postpones NotImplementedError(s) casued
#by asyncron.extensions not matching, to inside asyncio.gather
async def raise_notemp( e ): raise e
mapping = {}
async for instance in self:
try:
mapping[instance.pk] = getattr( instance, method )( *args, **kwargs )
except NotImplementedError as e:
mapping[instance.pk] = raise_notemp(e)
returns = await asyncio.gather( *list(mapping.values()), return_exceptions = True )
for index, pk in enumerate(mapping):
mapping[pk] = returns[index]
return mapping
def to_json( self, *structure ):
return [ m.fields_to_dict( *structure ) for m in self ]
class BaseModel( models.Model ):
objects = AsyncronQuerySet.as_manager()
metadata = GenericRelation("asyncron.Metadata", content_type_field = 'model_type', object_id_field = 'model_id')
async def eval_related( self, *fields ):
if not fields:
fields = [ f.name for f in self._meta.fields if f.is_relation ]
#Since we're using an underscore variable
#This next line running correctly is optional,
#but helps reduce or eliminate 'sync_to_async' context switches.
try: fields = [ f for f in fields if f not in self._state.fields_cache ]
except: print("WARNING: could not check already cached relations.")
if fields:
await sync_to_async(lambda: [ getattr(self, f) for f in fields ])()
_model_presentations = {}
@classmethod
def presentation( cls, name ):
def attacher_decorator( f ):
cls._model_presentations[name] = f
return f
return attacher_decorator
def fields_to_dict( self, *fields, presentation_name = None ):
"""
To create json/dict from fields.
"""
if presentation_name:
assert presentation in self._model_presentations, f"This model '{self.__class__}' does not have a '{presentation_name}' presentation!"
fields.extend( self._model_presentations[presentation_name].fields )
results = {}
for f in fields:
name, method = (f[0], f[1]) if isinstance(f, tuple) else (f, f)
value = method(self) if callable(method) else rgetattr(self, method)
results[name] = value() if callable(value) else value
return results
class Meta:
abstract = True