aircox-radiocampus/aircox/test.py
Thomas Kairos f7a61fe6c0 Feat: packaging (#127)
- Add configuration files for packaging
- Precommit now uses ruff

Co-authored-by: bkfox <thomas bkfox net>
Reviewed-on: rc/aircox#127
2023-10-11 10:58:34 +02:00

282 lines
8.3 KiB
Python

"""This module provide test utilities."""
from collections import namedtuple
import inspect
__all__ = ("interface", "Interface", "File")
def interface(obj, funcs):
"""Override provided object's functions using dict of funcs, as
``{func_name: return_value}``.
Attribute ``obj.calls`` is a dict with all call done using those
methods, as ``{func_name: (args, kwargs) | list[(args, kwargs]]}``.
"""
if not isinstance(getattr(obj, "calls", None), dict):
obj.calls = {}
for attr, value in funcs.items():
interface_wrap(obj, attr, value)
def interface_wrap(obj, attr, value):
obj.calls[attr] = None
def wrapper(*a, **kw):
call = obj.calls.get(attr)
if call is None:
obj.calls[attr] = (a, kw)
elif isinstance(call, tuple):
obj.calls[attr] = [call, (a, kw)]
else:
call.append((a, kw))
return value
setattr(obj, attr, wrapper)
return wrapper
InterfaceTarget = namedtuple(
"InterfaceTarget",
["target", "namespace", "key"],
defaults=[("namespace", None), ("key", None)],
)
class WrapperMixin:
type_interface = None
"""For instance of class wrapped by an Interface, this is the wrapping
interface of the class."""
instances = None
ns = None
ns_attr = None
def __init__(self, target=None, ns=None, ns_attr=None, type_interface=None, **kwargs):
self.target = target
if ns:
self.inject(ns, ns_attr)
if self.type_interface:
self._set_type_interface(type_interface)
super().__init__(**kwargs)
def _set_type_interface(self, type_interface):
if self.type_interface:
raise RuntimeError("a type interface is already assigned")
self.type_interface = type_interface
if not type_interface.instances:
type_interface.instances = [self]
else:
type_interface.instances.append(self)
@property
def ns_target(self):
"""Actual namespace's target (using ns.ns_attr)"""
if self.ns and self.ns_attr:
return getattr(self.ns, self.ns_attr, None)
return None
def inject(self, ns=None, ns_attr=None):
"""Inject interface into namespace at given key."""
if not (ns and ns_attr):
raise ValueError("ns and ns_attr must be provided together")
ns_target = getattr(ns, ns_attr, None)
if self.target is ns_target:
return
elif self.target is not None and self.ns:
raise RuntimeError("self target already injected. It must be " "`release` before `inject`.")
self.target = ns_target
setattr(ns, ns_attr, self.interface)
self.ns = ns
self.ns_attr = ns_attr
return self
def release(self):
"""Remove injection from previously injected parent, reset target."""
if self.ns_target is self.interface:
setattr(self.ns, self.ns_attr, self.target)
self.target = None
return self
class SpoofMixin:
traces = None
def __init__(self, funcs=None, **kwargs):
self.reset(
funcs or {},
)
super().__init__(**kwargs)
def reset(self, funcs=None):
self.traces = {}
if funcs is not None:
self.funcs = funcs
def get_trace(self, name="__call__", args=False, kw=False):
"""Get a function call parameters.
:param str name: function name
:param bool args: return positional arguments
:param bool|str kwargs: return named arguments. If a string, get the \
named argument at this key.
:returns either a tuple of args, a dict of kwargs, or a tuple \
of `(args, kwargs)`.
:raises ValueError: the function has been called multiple time.
"""
trace = self.traces[name]
if isinstance(trace, list):
raise ValueError(f"{name} called multiple times.")
return self._get_trace(trace, args=args, kw=kw)
def get_traces(self, name="__call__", args=False, kw=False):
"""Get a tuple of all call parameters.
Parameters are the same as `get()`.
"""
traces = self.traces[name]
if not isinstance(traces, list):
traces = (traces,)
return tuple(self._get_trace(trace, args=args, kw=kw) for trace in traces)
def _get_trace(self, trace, args=False, kw=False):
if (args and kw) or (not args and not kw):
return trace
elif args:
return trace[0]
elif isinstance(kw, str):
return trace[1][kw]
return trace[1]
def call(self, name, args, kw):
"""Add call for function of provided name, and return predefined
result."""
self.add(name, args, kw)
return self.get_result(name, args, kw)
def add(self, name, args, kw):
"""Add call parameters to `self.traces` for the function with the
provided `name`."""
trace = self.traces.get(name)
if trace is None:
self.traces[name] = (args, kw)
elif isinstance(trace, tuple):
self.traces[name] = [trace, (args, kw)]
else:
trace.append((args, kw))
def get_result(self, name, a, kw):
"""Get result for the function of the provided `name`.
:raises KeyError: no registered function with this `name`.
"""
func = self.funcs[name]
if callable(func):
return func(self, *a, **kw)
return func
class Interface:
class IMeta(SpoofMixin, WrapperMixin):
def __init__(self, interface, **kwargs):
self.interface = interface
super().__init__(**kwargs)
def clone(self, **kwargs):
"""Return an Interface copying some values from self."""
kwargs.update(
{
"target": self.target,
"funcs": self.funcs,
}
)
return type(self.interface)(_imeta_kw=kwargs)._imeta
def __getitem__(self, name):
return self.traces[name]
_imeta = None
"""This contains a InterfaceMeta instance related to Interface one.
`_imeta` is used to check tests etc.
"""
def __init__(self, _target=None, _funcs=None, _imeta_kw=None, **kwargs):
if _imeta_kw is None:
_imeta_kw = {}
if _funcs is not None:
_imeta_kw.setdefault("funcs", _funcs)
if _target is not None:
_imeta_kw.setdefault("target", _target)
self._imeta = self.IMeta(self, **_imeta_kw)
self.__dict__.update(kwargs)
@property
def _itarget(self):
return self._imeta.target
@classmethod
def inject(cls, ns, ns_attr, funcs=None, **kwargs):
kwargs["_imeta_kw"] = {"ns": ns, "ns_attr": ns_attr, "funcs": funcs}
return cls(**kwargs)
def _irelease(self):
"""Shortcut to `self._imeta.release`."""
self._imeta.release()
self._imeta.reset()
def _trace(self, *args, **kw):
"""Shortcut to `self._imeta.get_trace`."""
return self._imeta.get_trace(*args, **kw)
def _traces(self, *args, **kw):
"""Shortcut to `self._imeta.get_traces`."""
return self._imeta.get_traces(*args, **kw)
def __call__(self, *args, **kwargs):
target = self._imeta.target
if inspect.isclass(target):
target = target(*args, **kwargs)
return type(self)(
target,
_imeta_kw={"type_interface": self, "funcs": self._imeta.funcs},
)
if "__call__" in self._imeta.funcs:
return self._imeta.call("__call__", args, kwargs)
self._imeta.add("__call__", args, kwargs)
return self._imeta.target(*args, **kwargs)
def __getattr__(self, attr):
if attr in self._imeta.funcs:
return lambda *args, **kwargs: self._imeta.call(attr, args, kwargs)
return getattr(self._imeta.target, attr)
def __str__(self):
iface = super().__str__()
return f"{iface}::{self._imeta.target}"
class File:
def __init__(self, data=""):
self.data = data
def read(self):
return self.data
def write(self, data):
self.data += data
def close(self):
self.data = None
def __enter__(self):
return self
def __exit__(self, *_, **__):
pass