forked from rc/aircox
!111 Co-authored-by: bkfox <thomas bkfox net> Reviewed-on: rc/aircox#114
This commit is contained in:
257
aircox/test.py
257
aircox/test.py
@ -1,7 +1,9 @@
|
||||
"""This module provide test utilities."""
|
||||
from collections import namedtuple
|
||||
import inspect
|
||||
|
||||
|
||||
__all__ = ("interface",)
|
||||
__all__ = ("interface", "Interface", "File")
|
||||
|
||||
|
||||
def interface(obj, funcs):
|
||||
@ -31,3 +33,256 @@ def interface_wrap(obj, attr, value):
|
||||
return value
|
||||
|
||||
setattr(obj, attr, wrapper)
|
||||
return wrapper
|
||||
|
||||
|
||||
InterfaceTarget = namedtuple(
|
||||
"InterfaceTarget",
|
||||
["target", "namespace", "key"],
|
||||
defaults=[("namespace", None), ("key", None)],
|
||||
)
|
||||
|
||||
|
||||
class WrapperMixin:
|
||||
type_interface = None
|
||||
"""For instance of class wrapped by an Interface, this is the wrapping
|
||||
interface of the class."""
|
||||
instances = None
|
||||
ns = None
|
||||
ns_attr = None
|
||||
|
||||
def __init__(
|
||||
self, target=None, ns=None, ns_attr=None, type_interface=None, **kwargs
|
||||
):
|
||||
self.target = target
|
||||
if ns:
|
||||
self.inject(ns, ns_attr)
|
||||
if self.type_interface:
|
||||
self._set_type_interface(type_interface)
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def _set_type_interface(self, type_interface):
|
||||
if self.type_interface:
|
||||
raise RuntimeError("a type interface is already assigned")
|
||||
|
||||
self.type_interface = type_interface
|
||||
if not type_interface.instances:
|
||||
type_interface.instances = [self]
|
||||
else:
|
||||
type_interface.instances.append(self)
|
||||
|
||||
@property
|
||||
def ns_target(self):
|
||||
"""Actual namespace's target (using ns.ns_attr)"""
|
||||
if self.ns and self.ns_attr:
|
||||
return getattr(self.ns, self.ns_attr, None)
|
||||
return None
|
||||
|
||||
def inject(self, ns=None, ns_attr=None):
|
||||
"""Inject interface into namespace at given key."""
|
||||
if not (ns and ns_attr):
|
||||
raise ValueError("ns and ns_attr must be provided together")
|
||||
|
||||
ns_target = getattr(ns, ns_attr, None)
|
||||
if self.target is ns_target:
|
||||
return
|
||||
elif self.target is not None and self.ns:
|
||||
raise RuntimeError(
|
||||
"self target already injected. It must be "
|
||||
"`release` before `inject`."
|
||||
)
|
||||
|
||||
self.target = ns_target
|
||||
setattr(ns, ns_attr, self.interface)
|
||||
|
||||
self.ns = ns
|
||||
self.ns_attr = ns_attr
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
"""Remove injection from previously injected parent, reset target."""
|
||||
if self.ns_target is self.interface:
|
||||
setattr(self.ns, self.ns_attr, self.target)
|
||||
self.target = None
|
||||
return self
|
||||
|
||||
|
||||
class SpoofMixin:
|
||||
traces = None
|
||||
|
||||
def __init__(self, funcs=None, **kwargs):
|
||||
self.reset(
|
||||
funcs or {},
|
||||
)
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def reset(self, funcs=None):
|
||||
self.traces = {}
|
||||
if funcs is not None:
|
||||
self.funcs = funcs
|
||||
|
||||
def get_trace(self, name, args=False, kw=False):
|
||||
"""Get a function call parameters.
|
||||
|
||||
:param str name: function name
|
||||
:param bool args: return positional arguments
|
||||
:param bool|str kwargs: return named arguments. If a string, get the \
|
||||
named argument at this key.
|
||||
:returns either a tuple of args, a dict of kwargs, or a tuple \
|
||||
of `(args, kwargs)`.
|
||||
:raises ValueError: the function has been called multiple time.
|
||||
"""
|
||||
trace = self.traces[name]
|
||||
if isinstance(trace, list):
|
||||
raise ValueError(f"{name} called multiple times.")
|
||||
return self._get_trace(trace, args=args, kw=kw)
|
||||
|
||||
def get_traces(self, name, args=False, kw=False):
|
||||
"""Get a tuple of all call parameters.
|
||||
|
||||
Parameters are the same as `get()`.
|
||||
"""
|
||||
traces = self.traces[name]
|
||||
if not isinstance(traces, list):
|
||||
traces = (traces,)
|
||||
return tuple(
|
||||
self._get_trace(trace, args=args, kw=kw) for trace in traces
|
||||
)
|
||||
|
||||
def _get_trace(self, trace, args=False, kw=False):
|
||||
if (args and kw) or (not args and not kw):
|
||||
return trace
|
||||
elif args:
|
||||
return trace[0]
|
||||
elif isinstance(kw, str):
|
||||
return trace[1][kw]
|
||||
return trace[1]
|
||||
|
||||
def call(self, name, args, kw):
|
||||
"""Add call for function of provided name, and return predefined
|
||||
result."""
|
||||
self.add(name, args, kw)
|
||||
return self.get_result(name, args, kw)
|
||||
|
||||
def add(self, name, args, kw):
|
||||
"""Add call parameters to `self.traces` for the function with the
|
||||
provided `name`."""
|
||||
trace = self.traces.get(name)
|
||||
if trace is None:
|
||||
self.traces[name] = (args, kw)
|
||||
elif isinstance(trace, tuple):
|
||||
self.traces[name] = [trace, (args, kw)]
|
||||
else:
|
||||
trace.append((args, kw))
|
||||
|
||||
def get_result(self, name, a, kw):
|
||||
"""Get result for the function of the provided `name`.
|
||||
|
||||
:raises KeyError: no registered function with this `name`.
|
||||
"""
|
||||
func = self.funcs[name]
|
||||
if callable(func):
|
||||
return func(self, *a, **kw)
|
||||
return func
|
||||
|
||||
|
||||
class Interface:
|
||||
class IMeta(SpoofMixin, WrapperMixin):
|
||||
def __init__(self, interface, **kwargs):
|
||||
self.interface = interface
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def clone(self, **kwargs):
|
||||
"""Return an Interface copying some values from self."""
|
||||
kwargs.update(
|
||||
{
|
||||
"target": self.target,
|
||||
"funcs": self.funcs,
|
||||
}
|
||||
)
|
||||
return type(self.interface)(_imeta_kw=kwargs)._imeta
|
||||
|
||||
def __getitem__(self, name):
|
||||
return self.traces[name]
|
||||
|
||||
_imeta = None
|
||||
"""This contains a InterfaceMeta instance related to Interface one.
|
||||
|
||||
`_imeta` is used to check tests etc.
|
||||
"""
|
||||
|
||||
def __init__(self, _target=None, _funcs=None, _imeta_kw=None, **kwargs):
|
||||
if _imeta_kw is None:
|
||||
_imeta_kw = {}
|
||||
if _funcs is not None:
|
||||
_imeta_kw.setdefault("funcs", _funcs)
|
||||
if _target is not None:
|
||||
_imeta_kw.setdefault("target", _target)
|
||||
self._imeta = self.IMeta(self, **_imeta_kw)
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
@property
|
||||
def _itarget(self):
|
||||
return self._imeta.target
|
||||
|
||||
@classmethod
|
||||
def inject(cls, ns, ns_attr, funcs=None, **kwargs):
|
||||
kwargs["_imeta_kw"] = {"ns": ns, "ns_attr": ns_attr, "funcs": funcs}
|
||||
return cls(**kwargs)
|
||||
|
||||
def _irelease(self):
|
||||
"""Shortcut to `self._imeta.release`."""
|
||||
self._imeta.release()
|
||||
self._imeta.reset()
|
||||
|
||||
def _trace(self, *args, **kw):
|
||||
"""Shortcut to `self._imeta.get_trace`."""
|
||||
return self._imeta.get_trace(*args, **kw)
|
||||
|
||||
def _traces(self, *args, **kw):
|
||||
"""Shortcut to `self._imeta.get_traces`."""
|
||||
return self._imeta.get_traces(*args, **kw)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
target = self._imeta.target
|
||||
if inspect.isclass(target):
|
||||
target = target(*args, **kwargs)
|
||||
return type(self)(
|
||||
target,
|
||||
_imeta_kw={"type_interface": self, "funcs": self._imeta.funcs},
|
||||
)
|
||||
|
||||
if "__call__" in self._imeta.funcs:
|
||||
return self._imeta.call("__call__", args, kwargs)
|
||||
|
||||
self._imeta.add("__call__", args, kwargs)
|
||||
return self._imeta.target(*args, **kwargs)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if attr in self._imeta.funcs:
|
||||
return lambda *args, **kwargs: self._imeta.call(attr, args, kwargs)
|
||||
return getattr(self._imeta.target, attr)
|
||||
|
||||
def __str__(self):
|
||||
iface = super().__str__()
|
||||
return f"{iface}::{self._imeta.target}"
|
||||
|
||||
|
||||
class File:
|
||||
def __init__(self, data=""):
|
||||
self.data = data
|
||||
|
||||
def read(self):
|
||||
return self.data
|
||||
|
||||
def write(self, data):
|
||||
self.data += data
|
||||
|
||||
def close(self):
|
||||
self.data = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *_, **__):
|
||||
pass
|
||||
|
Reference in New Issue
Block a user