import concurrent.futures as futures from datetime import datetime, timedelta import time from django.test import TestCase from aircox.management.sound_monitor import \ NotifyHandler, MoveHandler, ModifiedHandler, MonitorHandler __all__ = ('NotifyHandlerTestCase', 'MoveHandlerTestCase', 'ModifiedHandlerTestCase', 'MonitorHandlerTestCase',) class FakeEvent: def __init__(self, **kwargs): self.__dict__.update(**kwargs) class WaitHandler(NotifyHandler): def __call__(self, timeout=0.5, *args, **kwargs): # using time.sleep make the future done directly, don't know why start = datetime.now() while datetime.now() - start < timedelta(seconds=4): pass class NotifyHandlerTestCase(TestCase): pass class MoveHandlerTestCase(TestCase): pass class ModifiedHandlerTestCase(TestCase): def test_wait(self): handler = ModifiedHandler() handler.timeout_delta = timedelta(seconds=0.1) start = datetime.now() handler.wait() delta = datetime.now() - start self.assertTrue(delta < handler.timeout_delta + timedelta(seconds=0.1)) def test_wait_ping(self): pool = futures.ThreadPoolExecutor(1) handler = ModifiedHandler() handler.timeout_delta = timedelta(seconds=0.5) future = pool.submit(handler.wait) time.sleep(0.3) handler.ping() time.sleep(0.3) self.assertTrue(future.running()) class MonitorHandlerTestCase(TestCase): def setUp(self): pool = futures.ThreadPoolExecutor(2) self.monitor = MonitorHandler('archives', pool) def test_submit_new_job(self): event = FakeEvent(src_path='dummy_src') handler = NotifyHandler() result, _ = self.monitor._submit(handler, event, 'up') self.assertIs(handler, result) self.assertIsInstance(handler.future, futures.Future) self.monitor.pool.shutdown() def test_submit_job_exists(self): event = FakeEvent(src_path='dummy_src') job_1, new_1 = self.monitor._submit(WaitHandler(), event, 'up') job_2, new_2 = self.monitor._submit(NotifyHandler(), event, 'up') self.assertIs(job_1, job_2) self.assertTrue(new_1) self.assertFalse(new_2) self.monitor.pool.shutdown()