aircox-radiocampus/aircox/tests/management/sound_monitor.py
2023-01-25 16:21:19 +01:00

79 lines
2.3 KiB
Python

import concurrent.futures as futures
from datetime import datetime, timedelta
import time
from django.test import TestCase
from aircox.management.sound_monitor import \
NotifyHandler, MoveHandler, ModifiedHandler, MonitorHandler
__all__ = ('NotifyHandlerTestCase', 'MoveHandlerTestCase',
'ModifiedHandlerTestCase', 'MonitorHandlerTestCase',)
class FakeEvent:
def __init__(self, **kwargs):
self.__dict__.update(**kwargs)
class WaitHandler(NotifyHandler):
def __call__(self, timeout=0.5, *args, **kwargs):
# using time.sleep make the future done directly, don't know why
start = datetime.now()
while datetime.now() - start < timedelta(seconds=4):
pass
class NotifyHandlerTestCase(TestCase):
pass
class MoveHandlerTestCase(TestCase):
pass
class ModifiedHandlerTestCase(TestCase):
def test_wait(self):
handler = ModifiedHandler()
handler.timeout_delta = timedelta(seconds=0.1)
start = datetime.now()
handler.wait()
delta = datetime.now() - start
self.assertTrue(delta < handler.timeout_delta + timedelta(seconds=0.1))
def test_wait_ping(self):
pool = futures.ThreadPoolExecutor(1)
handler = ModifiedHandler()
handler.timeout_delta = timedelta(seconds=0.5)
future = pool.submit(handler.wait)
time.sleep(0.3)
handler.ping()
time.sleep(0.3)
self.assertTrue(future.running())
class MonitorHandlerTestCase(TestCase):
def setUp(self):
pool = futures.ThreadPoolExecutor(2)
self.monitor = MonitorHandler('archives', pool)
def test_submit_new_job(self):
event = FakeEvent(src_path='dummy_src')
handler = NotifyHandler()
result, _ = self.monitor._submit(handler, event, 'up')
self.assertIs(handler, result)
self.assertIsInstance(handler.future, futures.Future)
self.monitor.pool.shutdown()
def test_submit_job_exists(self):
event = FakeEvent(src_path='dummy_src')
job_1, new_1 = self.monitor._submit(WaitHandler(), event, 'up')
job_2, new_2 = self.monitor._submit(NotifyHandler(), event, 'up')
self.assertIs(job_1, job_2)
self.assertTrue(new_1)
self.assertFalse(new_2)
self.monitor.pool.shutdown()