forked from rc/aircox
85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
import concurrent.futures as futures
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.test import TestCase
|
|
|
|
from aircox.management.sound_monitor import (
|
|
ModifiedHandler,
|
|
MonitorHandler,
|
|
NotifyHandler,
|
|
)
|
|
|
|
__all__ = (
|
|
"NotifyHandlerTestCase",
|
|
"MoveHandlerTestCase",
|
|
"ModifiedHandlerTestCase",
|
|
"MonitorHandlerTestCase",
|
|
)
|
|
|
|
|
|
class FakeEvent:
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(**kwargs)
|
|
|
|
|
|
class WaitHandler(NotifyHandler):
|
|
def __call__(self, timeout=0.5, *args, **kwargs):
|
|
# using time.sleep make the future done directly, don't know why
|
|
start = datetime.now()
|
|
while datetime.now() - start < timedelta(seconds=4):
|
|
pass
|
|
|
|
|
|
class NotifyHandlerTestCase(TestCase):
|
|
pass
|
|
|
|
|
|
class MoveHandlerTestCase(TestCase):
|
|
pass
|
|
|
|
|
|
class ModifiedHandlerTestCase(TestCase):
|
|
def test_wait(self):
|
|
handler = ModifiedHandler()
|
|
handler.timeout_delta = timedelta(seconds=0.1)
|
|
start = datetime.now()
|
|
handler.wait()
|
|
delta = datetime.now() - start
|
|
self.assertTrue(delta < handler.timeout_delta + timedelta(seconds=0.1))
|
|
|
|
def test_wait_ping(self):
|
|
pool = futures.ThreadPoolExecutor(1)
|
|
handler = ModifiedHandler()
|
|
handler.timeout_delta = timedelta(seconds=0.5)
|
|
|
|
future = pool.submit(handler.wait)
|
|
time.sleep(0.3)
|
|
handler.ping()
|
|
time.sleep(0.3)
|
|
self.assertTrue(future.running())
|
|
|
|
|
|
class MonitorHandlerTestCase(TestCase):
|
|
def setUp(self):
|
|
pool = futures.ThreadPoolExecutor(2)
|
|
self.monitor = MonitorHandler("archives", pool)
|
|
|
|
def test_submit_new_job(self):
|
|
event = FakeEvent(src_path="dummy_src")
|
|
handler = NotifyHandler()
|
|
result, _ = self.monitor._submit(handler, event, "up")
|
|
self.assertIs(handler, result)
|
|
self.assertIsInstance(handler.future, futures.Future)
|
|
self.monitor.pool.shutdown()
|
|
|
|
def test_submit_job_exists(self):
|
|
event = FakeEvent(src_path="dummy_src")
|
|
|
|
job_1, new_1 = self.monitor._submit(WaitHandler(), event, "up")
|
|
job_2, new_2 = self.monitor._submit(NotifyHandler(), event, "up")
|
|
self.assertIs(job_1, job_2)
|
|
self.assertTrue(new_1)
|
|
self.assertFalse(new_2)
|
|
self.monitor.pool.shutdown()
|