50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
import collections
|
|
import functools
|
|
from .utils import rgetattr
|
|
|
|
class Extender:
|
|
capturing_instance = None
|
|
|
|
@classmethod
|
|
def stop_capturing( cls ):
|
|
cls.capturing_instance = None
|
|
|
|
def __init__( self, model ):
|
|
self.__class__.capturing_instance = self
|
|
self.model = model
|
|
self.extensions = collections.defaultdict( list ) # method_name -> list of tuple(args, kwargs)
|
|
|
|
def __call__( self, *checks, **filters ):
|
|
def decorator( f ):
|
|
self.extensions[ f.__name__ ].append( (f, checks, filters) )
|
|
return f
|
|
return decorator
|
|
|
|
def attach( extender, cls ):
|
|
#Attach the miss catch attr method
|
|
def __getattr__( self, extended_name ):
|
|
if extended_name not in extender.extensions:
|
|
try: super_getattr = super(self.__class__).__getattr__
|
|
except AttributeError: raise AttributeError(f"{cls} Model has no extension '{extended_name}'.")
|
|
else: return super_getattr( self, method )
|
|
|
|
method_candidates = extender.extensions[extended_name]
|
|
def run_matching_candidate( *args, **kwargs ):
|
|
for f, checks, filters in method_candidates:
|
|
|
|
check_failed = False
|
|
for check in checks:
|
|
if not check( self ):
|
|
check_failed = True
|
|
break
|
|
if check_failed: continue
|
|
|
|
if any( rgetattr(self, k.replace("__", ".")) != v for k, v in filters.items() ):
|
|
continue
|
|
|
|
return f( self, *args, **kwargs )
|
|
raise NotImplementedError(f"{self} Did not match any extensions for '{extended_name}'.")
|
|
return run_matching_candidate
|
|
|
|
cls.__getattr__ = __getattr__
|