forked from rc/aircox
		
	cfr #121 Co-authored-by: Christophe Siraut <d@tobald.eu.org> Co-authored-by: bkfox <thomas bkfox net> Co-authored-by: Thomas Kairos <thomas@bkfox.net> Reviewed-on: rc/aircox#131 Co-authored-by: Chris Tactic <ctactic@noreply.git.radiocampus.be> Co-committed-by: Chris Tactic <ctactic@noreply.git.radiocampus.be>
		
			
				
	
	
		
			276 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from concurrent import futures
 | 
						|
import pytest
 | 
						|
 | 
						|
from django.utils import timezone as tz
 | 
						|
 | 
						|
from aircox.conf import settings
 | 
						|
from aircox.models import Sound
 | 
						|
from aircox.controllers import sound_monitor
 | 
						|
from aircox.test import Interface, interface
 | 
						|
 | 
						|
 | 
						|
now = tz.datetime.now()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def event():
 | 
						|
    return Interface(src_path="/tmp/src_path", dest_path="/tmp/dest_path")
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def interfaces():
 | 
						|
    items = {
 | 
						|
        "SoundFile": Interface.inject(
 | 
						|
            sound_monitor,
 | 
						|
            "SoundFile",
 | 
						|
            {
 | 
						|
                "sync": None,
 | 
						|
            },
 | 
						|
        ),
 | 
						|
        "time": Interface.inject(
 | 
						|
            sound_monitor,
 | 
						|
            "time",
 | 
						|
            {
 | 
						|
                "sleep": None,
 | 
						|
            },
 | 
						|
        ),
 | 
						|
        "datetime": Interface.inject(sound_monitor, "datetime", {"now": now}),
 | 
						|
    }
 | 
						|
    yield items
 | 
						|
    for item in items.values():
 | 
						|
        item._irelease()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def task(interfaces):
 | 
						|
    return sound_monitor.Task()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def delete_task(interfaces):
 | 
						|
    return sound_monitor.DeleteTask()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def move_task(interfaces):
 | 
						|
    return sound_monitor.MoveTask()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def modified_task(interfaces):
 | 
						|
    return sound_monitor.ModifiedTask()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def monitor_handler(interfaces):
 | 
						|
    pool = Interface(
 | 
						|
        None,
 | 
						|
        {
 | 
						|
            "submit": lambda imeta, *a, **kw: Interface(
 | 
						|
                None,
 | 
						|
                {
 | 
						|
                    "add_done_callback": None,
 | 
						|
                    "done": False,
 | 
						|
                },
 | 
						|
            )
 | 
						|
        },
 | 
						|
    )
 | 
						|
    return sound_monitor.MonitorHandler("/tmp", pool=pool, sync_kw=13)
 | 
						|
 | 
						|
 | 
						|
class TestTask:
 | 
						|
    def test___init__(self, task):
 | 
						|
        assert task.timestamp is not None
 | 
						|
 | 
						|
    def test_ping(self, task):
 | 
						|
        task.timestamp = None
 | 
						|
        task.ping()
 | 
						|
        assert task.timestamp >= now
 | 
						|
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test___call__(self, logger, task, event):
 | 
						|
        task.log_msg = "--{event.src_path}--"
 | 
						|
        sound_file = task(event, logger=logger, kw=13)
 | 
						|
        assert sound_file._trace("sync", kw=True) == {"kw": 13}
 | 
						|
        assert logger._trace("info", args=True) == (task.log_msg.format(event=event),)
 | 
						|
 | 
						|
 | 
						|
class TestDeleteTask:
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test___call__(self, delete_task, logger, task, event):
 | 
						|
        sound_file = delete_task(event, logger=logger)
 | 
						|
        assert sound_file._trace("sync", kw=True) == {"deleted": True}
 | 
						|
 | 
						|
 | 
						|
class TestMoveTask:
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test__call___with_sound(self, move_task, sound, event, logger):
 | 
						|
        event.src_path = sound.file.name
 | 
						|
        sound_file = move_task(event, logger=logger)
 | 
						|
        assert isinstance(sound_file._trace("sync", kw="sound"), Sound)
 | 
						|
        assert sound_file.path == sound.file.name
 | 
						|
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test__call___no_sound(self, move_task, event, logger):
 | 
						|
        sound_file = move_task(event, logger=logger)
 | 
						|
        assert sound_file._trace("sync", kw=True) == {}
 | 
						|
        assert sound_file.path == event.dest_path
 | 
						|
 | 
						|
 | 
						|
class TestModifiedTask:
 | 
						|
    def test_wait(self, modified_task):
 | 
						|
        dt_now = now + modified_task.timeout_delta - tz.timedelta(hours=10)
 | 
						|
        datetime = Interface.inject(sound_monitor, "datetime", {"now": dt_now})
 | 
						|
 | 
						|
        def sleep(imeta, n):
 | 
						|
            datetime._imeta.funcs["now"] = modified_task.timestamp + tz.timedelta(hours=10)
 | 
						|
 | 
						|
        time = Interface.inject(sound_monitor, "time", {"sleep": sleep})
 | 
						|
        modified_task.wait()
 | 
						|
        assert time._trace("sleep", args=True)
 | 
						|
 | 
						|
        datetime._imeta.release()
 | 
						|
 | 
						|
    def test__call__(self, modified_task, event):
 | 
						|
        interface(modified_task, {"wait": None})
 | 
						|
        modified_task(event)
 | 
						|
        assert modified_task.calls["wait"]
 | 
						|
 | 
						|
 | 
						|
class TestMonitorHandler:
 | 
						|
    def test_on_created(self, monitor_handler, event):
 | 
						|
        monitor_handler._submit = monitor_handler.pool.submit
 | 
						|
        monitor_handler.on_created(event)
 | 
						|
        trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
 | 
						|
        assert isinstance(trace_args[0], sound_monitor.CreateTask)
 | 
						|
        assert trace_args[1:] == (event, "new")
 | 
						|
        assert trace_kwargs == monitor_handler.sync_kw
 | 
						|
 | 
						|
    def test_on_deleted(self, monitor_handler, event):
 | 
						|
        monitor_handler._submit = monitor_handler.pool.submit
 | 
						|
        monitor_handler.on_deleted(event)
 | 
						|
        trace_args, _ = monitor_handler.pool._trace("submit")
 | 
						|
        assert isinstance(trace_args[0], sound_monitor.DeleteTask)
 | 
						|
        assert trace_args[1:] == (event, "del")
 | 
						|
 | 
						|
    def test_on_moved(self, monitor_handler, event):
 | 
						|
        monitor_handler._submit = monitor_handler.pool.submit
 | 
						|
        monitor_handler.on_moved(event)
 | 
						|
        trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
 | 
						|
        assert isinstance(trace_args[0], sound_monitor.MoveTask)
 | 
						|
        assert trace_args[1:] == (event, "mv")
 | 
						|
        assert trace_kwargs == monitor_handler.sync_kw
 | 
						|
 | 
						|
    def test_on_modified(self, monitor_handler, event):
 | 
						|
        monitor_handler._submit = monitor_handler.pool.submit
 | 
						|
        monitor_handler.on_modified(event)
 | 
						|
        trace_args, trace_kwargs = monitor_handler.pool._trace("submit")
 | 
						|
        assert isinstance(trace_args[0], sound_monitor.ModifiedTask)
 | 
						|
        assert trace_args[1:] == (event, "up")
 | 
						|
        assert trace_kwargs == monitor_handler.sync_kw
 | 
						|
 | 
						|
    def test__submit(self, monitor_handler, event):
 | 
						|
        handler = Interface()
 | 
						|
        handler, created = monitor_handler._submit(handler, event, "prefix", kw=13)
 | 
						|
        assert created
 | 
						|
        assert handler.future._trace("add_done_callback")
 | 
						|
        assert monitor_handler.pool._trace("submit") == (
 | 
						|
            (handler, event),
 | 
						|
            {"kw": 13},
 | 
						|
        )
 | 
						|
 | 
						|
        key = f"prefix:{event.src_path}"
 | 
						|
        assert monitor_handler.jobs.get(key) == handler
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def monitor_interfaces():
 | 
						|
    items = {
 | 
						|
        "atexit": Interface.inject(sound_monitor, "atexit", {"register": None, "leave": None}),
 | 
						|
        "observer": Interface.inject(
 | 
						|
            sound_monitor,
 | 
						|
            "Observer",
 | 
						|
            {
 | 
						|
                "schedule": None,
 | 
						|
                "start": None,
 | 
						|
            },
 | 
						|
        ),
 | 
						|
    }
 | 
						|
    yield items
 | 
						|
    for item in items.values():
 | 
						|
        item.release()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def monitor():
 | 
						|
    yield sound_monitor.SoundMonitor()
 | 
						|
 | 
						|
 | 
						|
class TestSoundMonitor:
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test_report(self, monitor, program, logger):
 | 
						|
        monitor.report(program, "component", "content", logger=logger)
 | 
						|
        msg = f"{program}, component: content"
 | 
						|
        assert logger._trace("info", args=True) == (msg,)
 | 
						|
 | 
						|
    @pytest.mark.django_db
 | 
						|
    def test_scan(self, monitor, programs, logger):
 | 
						|
        interface = Interface(None, {"scan_for_program": None})
 | 
						|
        monitor.scan_for_program = interface.scan_for_program
 | 
						|
        dirs = monitor.scan(logger)
 | 
						|
 | 
						|
        assert logger._traces("info") == tuple(
 | 
						|
            [
 | 
						|
                (("scan all programs...",), {}),
 | 
						|
            ]
 | 
						|
            + [((f"#{program.id} {program.title}",), {}) for program in programs]
 | 
						|
        )
 | 
						|
        assert dirs == [program.abspath for program in programs]
 | 
						|
        traces = tuple(
 | 
						|
            [
 | 
						|
                [
 | 
						|
                    (
 | 
						|
                        (program, settings.SOUND_BROADCASTS_SUBDIR),
 | 
						|
                        {"logger": logger, "broadcast": True},
 | 
						|
                    ),
 | 
						|
                    (
 | 
						|
                        (program, settings.SOUND_EXCERPTS_SUBDIR),
 | 
						|
                        {"logger": logger, "broadcast": False},
 | 
						|
                    ),
 | 
						|
                ]
 | 
						|
                for program in programs
 | 
						|
            ]
 | 
						|
        )
 | 
						|
        traces_flat = tuple([item for sublist in traces for item in sublist])
 | 
						|
        assert interface._traces("scan_for_program") == traces_flat
 | 
						|
 | 
						|
    # TODO / FIXME
 | 
						|
    def broken_test_monitor(self, monitor, monitor_interfaces, logger):
 | 
						|
        def sleep(*args, **kwargs):
 | 
						|
            monitor.stop()
 | 
						|
 | 
						|
        time = Interface.inject(sound_monitor, "time", {"sleep": sleep})
 | 
						|
        monitor.monitor(logger=logger)
 | 
						|
        time._irelease()
 | 
						|
 | 
						|
        observers = monitor_interfaces["observer"].instances
 | 
						|
        observer = observers and observers[0]
 | 
						|
        assert observer
 | 
						|
        schedules = observer._traces("schedule")
 | 
						|
        for (handler, *_), kwargs in schedules:
 | 
						|
            breakpoint()
 | 
						|
            assert isinstance(handler, sound_monitor.MonitorHandler)
 | 
						|
            assert isinstance(handler.pool, futures.ThreadPoolExecutor)
 | 
						|
            assert (handler.subdir, handler.type) in (
 | 
						|
                (settings.SOUND_ARCHIVES_SUBDIR, Sound.TYPE_ARCHIVE),
 | 
						|
                (settings.SOUND_EXCERPTS_SUBDIR, Sound.TYPE_EXCERPT),
 | 
						|
            )
 | 
						|
 | 
						|
        assert observer._trace("start")
 | 
						|
 | 
						|
        atexit = monitor_interfaces["atexit"]
 | 
						|
        assert atexit._trace("register")
 | 
						|
        assert atexit._trace("unregister")
 | 
						|
 | 
						|
        assert observers
 |