Files
aircox-radiocampus/aircox/tests/controllers/test_sound_monitor.py
2023-06-21 22:33:37 +02:00

275 lines
8.2 KiB
Python

from concurrent import futures
import logging
import pytest
from django.utils import timezone as tz
from aircox.conf import settings
from aircox.models import Sound
from aircox.controllers import sound_monitor
from aircox.test import Interface, interface
now = tz.datetime.now()
@pytest.fixture
def event():
return Interface(src_path="/tmp/src_path", dest_path="/tmp/dest_path")
@pytest.fixture
def logger():
logger = Interface(
logging, {"info": None, "debug": None, "error": None, "warning": None}
)
return logger
@pytest.fixture
def interfaces():
items = {
"SoundFile": Interface.inject(
sound_monitor,
"SoundFile",
{
"sync": None,
},
),
"time": Interface.inject(
sound_monitor,
"time",
{
"sleep": None,
},
),
"datetime": Interface.inject(sound_monitor, "datetime", {"now": now}),
}
yield items
for item in items.values():
item._irelease()
@pytest.fixture
def task(interfaces):
return sound_monitor.Task()
@pytest.fixture
def delete_task(interfaces):
return sound_monitor.DeleteTask()
@pytest.fixture
def move_task(interfaces):
return sound_monitor.MoveTask()
@pytest.fixture
def modified_task(interfaces):
return sound_monitor.ModifiedTask()
@pytest.fixture
def monitor_handler(interfaces):
pool = Interface(
None,
{
"submit": lambda imeta, *a, **kw: Interface(
None,
{
"add_done_callback": None,
"done": False,
},
)
},
)
return sound_monitor.MonitorHandler("/tmp", pool=pool, sync_kw=13)
class TestTask:
def test___init__(self, task):
assert task.timestamp is not None
def test_ping(self, task):
task.timestamp = None
task.ping()
assert task.timestamp >= now
@pytest.mark.django_db
def test___call__(self, logger, task, event):
task.log_msg = "--{event.src_path}--"
sound_file = task(event, logger=logger, kw=13)
assert sound_file._trace("sync", kw=True) == {"kw": 13}
assert logger._trace("info", args=True) == (
task.log_msg.format(event=event),
)
class TestDeleteTask:
@pytest.mark.django_db
def test___call__(self, delete_task, logger, task, event):
sound_file = delete_task(event, logger=logger)
assert sound_file._trace("sync", kw=True) == {"deleted": True}
class TestMoveTask:
@pytest.mark.django_db
def test__call___with_sound(self, move_task, sound, event, logger):
event.src_path = sound.file.name
sound_file = move_task(event, logger=logger)
assert isinstance(sound_file._trace("sync", kw="sound"), Sound)
assert sound_file.path == sound.file.name
@pytest.mark.django_db
def test__call___no_sound(self, move_task, event, logger):
sound_file = move_task(event, logger=logger)
assert sound_file._trace("sync", kw=True) == {}
assert sound_file.path == event.dest_path
class TestModifiedTask:
def test_wait(self, modified_task):
dt_now = now + modified_task.timeout_delta - tz.timedelta(hours=10)
datetime = Interface.inject(sound_monitor, "datetime", {"now": dt_now})
def sleep(imeta, n):
datetime._imeta.funcs[
"now"
] = modified_task.timestamp + tz.timedelta(hours=10)
time = Interface.inject(sound_monitor, "time", {"sleep": sleep})
modified_task.wait()
assert time._trace("sleep", args=True)
datetime._imeta.release()
def test__call__(self, modified_task, event):
interface(modified_task, {"wait": None})
modified_task(event)
assert modified_task.calls["wait"]
class TestMonitorHandler:
def test_on_created(self, monitor_handler, event):
monitor_handler._submit = monitor_handler.pool.submit
monitor_handler.on_created(event)
trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
assert isinstance(trace_args[0], sound_monitor.CreateTask)
assert trace_args[1:] == (event, "new")
assert trace_kwargs == monitor_handler.sync_kw
def test_on_deleted(self, monitor_handler, event):
monitor_handler._submit = monitor_handler.pool.submit
monitor_handler.on_deleted(event)
trace_args, _ = monitor_handler.pool._trace("submit")
assert isinstance(trace_args[0], sound_monitor.DeleteTask)
assert trace_args[1:] == (event, "del")
def test_on_moved(self, monitor_handler, event):
monitor_handler._submit = monitor_handler.pool.submit
monitor_handler.on_moved(event)
trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
assert isinstance(trace_args[0], sound_monitor.MoveTask)
assert trace_args[1:] == (event, "mv")
assert trace_kwargs == monitor_handler.sync_kw
def test_on_modified(self, monitor_handler, event):
monitor_handler._submit = monitor_handler.pool.submit
monitor_handler.on_modified(event)
trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
assert isinstance(trace_args[0], sound_monitor.ModifiedTask)
assert trace_args[1:] == (event, "up")
assert trace_kwargs == monitor_handler.sync_kw
def test__submit(self, monitor_handler, event):
handler = Interface()
handler, created = monitor_handler._submit(
handler, event, "prefix", kw=13
)
assert created
assert handler.future._trace("add_done_callback")
assert monitor_handler.pool._trace("submit") == (
(handler, event),
{"kw": 13},
)
key = f"prefix:{event.src_path}"
assert monitor_handler.jobs.get(key) == handler
@pytest.fixture
def monitor_interfaces():
items = {
"atexit": Interface.inject(
sound_monitor, "atexit", {"register": None, "leave": None}
),
"observer": Interface.inject(
sound_monitor,
"Observer",
{
"schedule": None,
"start": None,
},
),
}
yield items
for item in items.values():
item.release()
@pytest.fixture
def monitor():
yield sound_monitor.SoundMonitor()
class SoundMonitor:
def test_report(self, monitor, program, logger):
monitor.report(program, "component", "content", logger=logger)
msg = f"{program}, component: content"
assert logger._trace("info", args=True) == (msg,)
def test_scan(self, monitor, program, logger):
interface = Interface(None, {"scan_for_program": None})
monitor.scan_for_program = interface.scan_for_program
dirs = monitor.scan(logger)
assert logger._traces("info") == (
"scan all programs...",
f"#{program.id} {program.title}",
)
assert dirs == [program.abspath]
assert interface._traces("scan_for_program") == (
((program, settings.SOUND_ARCHIVES_SUBDIR), {"logger": logger})(
(program, settings.SOUND_EXCERPTS_SUBDIR), {"logger": logger}
)
)
def test_monitor(self, monitor, monitor_interfaces, logger):
def sleep(*args, **kwargs):
monitor.stop()
time = Interface.inject(sound_monitor, "time", {"sleep": sleep})
monitor.monitor(logger=logger)
time._irelease()
observers = monitor_interfaces["observer"].instances
observer = observers and observers[0]
assert observer
schedules = observer._traces("schedule")
for (handler, *_), kwargs in schedules:
assert isinstance(handler, sound_monitor.MonitorHandler)
assert isinstance(handler.pool, futures.ThreadPoolExecutor)
assert (handler.subdir, handler.type) in (
(settings.SOUND_ARCHIVES_SUBDIR, Sound.TYPE_ARCHIVE),
(settings.SOUND_EXCERPTS_SUBDIR, Sound.TYPE_EXCERPT),
)
assert observer._trace("start")
atexit = monitor_interfaces["atexit"]
assert atexit._trace("register")
assert atexit._trace("unregister")
assert observers