from concurrent import futures import pytest from django.utils import timezone as tz from aircox.conf import settings from aircox.models import Sound from aircox.controllers import sound_monitor from aircox.test import Interface, interface now = tz.datetime.now() @pytest.fixture def event(): return Interface(src_path="/tmp/src_path", dest_path="/tmp/dest_path") @pytest.fixture def interfaces(): items = { "SoundFile": Interface.inject( sound_monitor, "SoundFile", { "sync": None, }, ), "time": Interface.inject( sound_monitor, "time", { "sleep": None, }, ), "datetime": Interface.inject(sound_monitor, "datetime", {"now": now}), } yield items for item in items.values(): item._irelease() @pytest.fixture def task(interfaces): return sound_monitor.Task() @pytest.fixture def delete_task(interfaces): return sound_monitor.DeleteTask() @pytest.fixture def move_task(interfaces): return sound_monitor.MoveTask() @pytest.fixture def modified_task(interfaces): return sound_monitor.ModifiedTask() @pytest.fixture def monitor_handler(interfaces): pool = Interface( None, { "submit": lambda imeta, *a, **kw: Interface( None, { "add_done_callback": None, "done": False, }, ) }, ) return sound_monitor.MonitorHandler("/tmp", pool=pool, sync_kw=13) class TestTask: def test___init__(self, task): assert task.timestamp is not None def test_ping(self, task): task.timestamp = None task.ping() assert task.timestamp >= now @pytest.mark.django_db def test___call__(self, logger, task, event): task.log_msg = "--{event.src_path}--" sound_file = task(event, logger=logger, kw=13) assert sound_file._trace("sync", kw=True) == {"kw": 13} assert logger._trace("info", args=True) == (task.log_msg.format(event=event),) class TestDeleteTask: @pytest.mark.django_db def test___call__(self, delete_task, logger, task, event): sound_file = delete_task(event, logger=logger) assert sound_file._trace("sync", kw=True) == {"deleted": True} class TestMoveTask: @pytest.mark.django_db def test__call___with_sound(self, move_task, sound, event, logger): event.src_path = sound.file.name sound_file = move_task(event, logger=logger) assert isinstance(sound_file._trace("sync", kw="sound"), Sound) assert sound_file.path == sound.file.name @pytest.mark.django_db def test__call___no_sound(self, move_task, event, logger): sound_file = move_task(event, logger=logger) assert sound_file._trace("sync", kw=True) == {} assert sound_file.path == event.dest_path class TestModifiedTask: def test_wait(self, modified_task): dt_now = now + modified_task.timeout_delta - tz.timedelta(hours=10) datetime = Interface.inject(sound_monitor, "datetime", {"now": dt_now}) def sleep(imeta, n): datetime._imeta.funcs["now"] = modified_task.timestamp + tz.timedelta(hours=10) time = Interface.inject(sound_monitor, "time", {"sleep": sleep}) modified_task.wait() assert time._trace("sleep", args=True) datetime._imeta.release() def test__call__(self, modified_task, event): interface(modified_task, {"wait": None}) modified_task(event) assert modified_task.calls["wait"] class TestMonitorHandler: def test_on_created(self, monitor_handler, event): monitor_handler._submit = monitor_handler.pool.submit monitor_handler.on_created(event) trace_args, trace_kwargs = monitor_handler.pool._trace("submit") assert isinstance(trace_args[0], sound_monitor.CreateTask) assert trace_args[1:] == (event, "new") assert trace_kwargs == monitor_handler.sync_kw def test_on_deleted(self, monitor_handler, event): monitor_handler._submit = monitor_handler.pool.submit monitor_handler.on_deleted(event) trace_args, _ = monitor_handler.pool._trace("submit") assert isinstance(trace_args[0], sound_monitor.DeleteTask) assert trace_args[1:] == (event, "del") def test_on_moved(self, monitor_handler, event): monitor_handler._submit = monitor_handler.pool.submit monitor_handler.on_moved(event) trace_args, trace_kwargs = monitor_handler.pool._trace("submit") assert isinstance(trace_args[0], sound_monitor.MoveTask) assert trace_args[1:] == (event, "mv") assert trace_kwargs == monitor_handler.sync_kw def test_on_modified(self, monitor_handler, event): monitor_handler._submit = monitor_handler.pool.submit monitor_handler.on_modified(event) trace_args, trace_kwargs = monitor_handler.pool._trace("submit") assert isinstance(trace_args[0], sound_monitor.ModifiedTask) assert trace_args[1:] == (event, "up") assert trace_kwargs == monitor_handler.sync_kw def test__submit(self, monitor_handler, event): handler = Interface() handler, created = monitor_handler._submit(handler, event, "prefix", kw=13) assert created assert handler.future._trace("add_done_callback") assert monitor_handler.pool._trace("submit") == ( (handler, event), {"kw": 13}, ) key = f"prefix:{event.src_path}" assert monitor_handler.jobs.get(key) == handler @pytest.fixture def monitor_interfaces(): items = { "atexit": Interface.inject(sound_monitor, "atexit", {"register": None, "leave": None}), "observer": Interface.inject( sound_monitor, "Observer", { "schedule": None, "start": None, }, ), } yield items for item in items.values(): item.release() @pytest.fixture def monitor(): yield sound_monitor.SoundMonitor() class TestSoundMonitor: @pytest.mark.django_db def test_report(self, monitor, program, logger): monitor.report(program, "component", "content", logger=logger) msg = f"{program}, component: content" assert logger._trace("info", args=True) == (msg,) @pytest.mark.django_db def test_scan(self, monitor, programs, logger): interface = Interface(None, {"scan_for_program": None}) monitor.scan_for_program = interface.scan_for_program dirs = monitor.scan(logger) assert logger._traces("info") == tuple( [ (("scan all programs...",), {}), ] + [((f"#{program.id} {program.title}",), {}) for program in programs] ) assert dirs == [program.abspath for program in programs] traces = tuple( [ [ ( (program, settings.SOUND_BROADCASTS_SUBDIR), {"logger": logger, "broadcast": True}, ), ( (program, settings.SOUND_EXCERPTS_SUBDIR), {"logger": logger, "broadcast": False}, ), ] for program in programs ] ) traces_flat = tuple([item for sublist in traces for item in sublist]) assert interface._traces("scan_for_program") == traces_flat # TODO / FIXME def broken_test_monitor(self, monitor, monitor_interfaces, logger): def sleep(*args, **kwargs): monitor.stop() time = Interface.inject(sound_monitor, "time", {"sleep": sleep}) monitor.monitor(logger=logger) time._irelease() observers = monitor_interfaces["observer"].instances observer = observers and observers[0] assert observer schedules = observer._traces("schedule") for (handler, *_), kwargs in schedules: breakpoint() assert isinstance(handler, sound_monitor.MonitorHandler) assert isinstance(handler.pool, futures.ThreadPoolExecutor) assert (handler.subdir, handler.type) in ( (settings.SOUND_ARCHIVES_SUBDIR, Sound.TYPE_ARCHIVE), (settings.SOUND_EXCERPTS_SUBDIR, Sound.TYPE_EXCERPT), ) assert observer._trace("start") atexit = monitor_interfaces["atexit"] assert atexit._trace("register") assert atexit._trace("unregister") assert observers