"""This module provide test utilities.""" from collections import namedtuple import inspect __all__ = ("interface", "Interface", "File") def interface(obj, funcs): """Override provided object's functions using dict of funcs, as ``{func_name: return_value}``. Attribute ``obj.calls`` is a dict with all call done using those methods, as ``{func_name: (args, kwargs) | list[(args, kwargs]]}``. """ if not isinstance(getattr(obj, "calls", None), dict): obj.calls = {} for attr, value in funcs.items(): interface_wrap(obj, attr, value) def interface_wrap(obj, attr, value): obj.calls[attr] = None def wrapper(*a, **kw): call = obj.calls.get(attr) if call is None: obj.calls[attr] = (a, kw) elif isinstance(call, tuple): obj.calls[attr] = [call, (a, kw)] else: call.append((a, kw)) return value setattr(obj, attr, wrapper) return wrapper InterfaceTarget = namedtuple( "InterfaceTarget", ["target", "namespace", "key"], defaults=[("namespace", None), ("key", None)], ) class WrapperMixin: type_interface = None """For instance of class wrapped by an Interface, this is the wrapping interface of the class.""" instances = None ns = None ns_attr = None def __init__(self, target=None, ns=None, ns_attr=None, type_interface=None, **kwargs): self.target = target if ns: self.inject(ns, ns_attr) if self.type_interface: self._set_type_interface(type_interface) super().__init__(**kwargs) def _set_type_interface(self, type_interface): if self.type_interface: raise RuntimeError("a type interface is already assigned") self.type_interface = type_interface if not type_interface.instances: type_interface.instances = [self] else: type_interface.instances.append(self) @property def ns_target(self): """Actual namespace's target (using ns.ns_attr)""" if self.ns and self.ns_attr: return getattr(self.ns, self.ns_attr, None) return None def inject(self, ns=None, ns_attr=None): """Inject interface into namespace at given key.""" if not (ns and ns_attr): raise ValueError("ns and ns_attr must be provided together") ns_target = getattr(ns, ns_attr, None) if self.target is ns_target: return elif self.target is not None and self.ns: raise RuntimeError("self target already injected. It must be " "`release` before `inject`.") self.target = ns_target setattr(ns, ns_attr, self.interface) self.ns = ns self.ns_attr = ns_attr return self def release(self): """Remove injection from previously injected parent, reset target.""" if self.ns_target is self.interface: setattr(self.ns, self.ns_attr, self.target) self.target = None return self class SpoofMixin: traces = None def __init__(self, funcs=None, **kwargs): self.reset( funcs or {}, ) super().__init__(**kwargs) def reset(self, funcs=None): self.traces = {} if funcs is not None: self.funcs = funcs def get_trace(self, name="__call__", args=False, kw=False): """Get a function call parameters. :param str name: function name :param bool args: return positional arguments :param bool|str kwargs: return named arguments. If a string, get the \ named argument at this key. :returns either a tuple of args, a dict of kwargs, or a tuple \ of `(args, kwargs)`. :raises ValueError: the function has been called multiple time. """ trace = self.traces[name] if isinstance(trace, list): raise ValueError(f"{name} called multiple times.") return self._get_trace(trace, args=args, kw=kw) def get_traces(self, name="__call__", args=False, kw=False): """Get a tuple of all call parameters. Parameters are the same as `get()`. """ traces = self.traces[name] if not isinstance(traces, list): traces = (traces,) return tuple(self._get_trace(trace, args=args, kw=kw) for trace in traces) def _get_trace(self, trace, args=False, kw=False): if (args and kw) or (not args and not kw): return trace elif args: return trace[0] elif isinstance(kw, str): return trace[1][kw] return trace[1] def call(self, name, args, kw): """Add call for function of provided name, and return predefined result.""" self.add(name, args, kw) return self.get_result(name, args, kw) def add(self, name, args, kw): """Add call parameters to `self.traces` for the function with the provided `name`.""" trace = self.traces.get(name) if trace is None: self.traces[name] = (args, kw) elif isinstance(trace, tuple): self.traces[name] = [trace, (args, kw)] else: trace.append((args, kw)) def get_result(self, name, a, kw): """Get result for the function of the provided `name`. :raises KeyError: no registered function with this `name`. """ func = self.funcs[name] if callable(func): return func(self, *a, **kw) return func class Interface: class IMeta(SpoofMixin, WrapperMixin): def __init__(self, interface, **kwargs): self.interface = interface super().__init__(**kwargs) def clone(self, **kwargs): """Return an Interface copying some values from self.""" kwargs.update( { "target": self.target, "funcs": self.funcs, } ) return type(self.interface)(_imeta_kw=kwargs)._imeta def __getitem__(self, name): return self.traces[name] _imeta = None """This contains a InterfaceMeta instance related to Interface one. `_imeta` is used to check tests etc. """ def __init__(self, _target=None, _funcs=None, _imeta_kw=None, **kwargs): if _imeta_kw is None: _imeta_kw = {} if _funcs is not None: _imeta_kw.setdefault("funcs", _funcs) if _target is not None: _imeta_kw.setdefault("target", _target) self._imeta = self.IMeta(self, **_imeta_kw) self.__dict__.update(kwargs) @property def _itarget(self): return self._imeta.target @classmethod def inject(cls, ns, ns_attr, funcs=None, **kwargs): kwargs["_imeta_kw"] = {"ns": ns, "ns_attr": ns_attr, "funcs": funcs} return cls(**kwargs) def _irelease(self): """Shortcut to `self._imeta.release`.""" self._imeta.release() self._imeta.reset() def _trace(self, *args, **kw): """Shortcut to `self._imeta.get_trace`.""" return self._imeta.get_trace(*args, **kw) def _traces(self, *args, **kw): """Shortcut to `self._imeta.get_traces`.""" return self._imeta.get_traces(*args, **kw) def __call__(self, *args, **kwargs): target = self._imeta.target if inspect.isclass(target): target = target(*args, **kwargs) return type(self)( target, _imeta_kw={"type_interface": self, "funcs": self._imeta.funcs}, ) if "__call__" in self._imeta.funcs: return self._imeta.call("__call__", args, kwargs) self._imeta.add("__call__", args, kwargs) return self._imeta.target(*args, **kwargs) def __getattr__(self, attr): if attr in self._imeta.funcs: return lambda *args, **kwargs: self._imeta.call(attr, args, kwargs) return getattr(self._imeta.target, attr) def __str__(self): iface = super().__str__() return f"{iface}::{self._imeta.target}" class File: def __init__(self, data=""): self.data = data def read(self): return self.data def write(self, data): self.data += data def close(self): self.data = None def __enter__(self): return self def __exit__(self, *_, **__): pass