sound check

This commit is contained in:
bkfox
2023-01-25 13:02:21 +01:00
parent 276e65e0b4
commit 9ec25ed109
13 changed files with 671 additions and 330 deletions

View File

@ -0,0 +1,2 @@
from .sound_file import *
from .sound_monitor import *

View File

@ -0,0 +1,73 @@
from datetime import timedelta
from django.conf import settings as conf
from django.test import TestCase
from django.utils import timezone as tz
from aircox import models
from aircox.management.sound_file import SoundFile
__all__ = ('SoundFileTestCase',)
class SoundFileTestCase(TestCase):
path_infos = {
'test/20220101_10h13_1_sample_1.mp3': {
'year': 2022, 'month': 1, 'day': 1, 'hour': 10, 'minute': 13,
'n': 1, 'name': 'Sample 1'},
'test/20220102_10h13_sample_2.mp3': {
'year': 2022, 'month': 1, 'day': 2, 'hour': 10, 'minute': 13,
'name': 'Sample 2'},
'test/20220103_1_sample_3.mp3': {
'year': 2022, 'month': 1, 'day': 3, 'n': 1, 'name': 'Sample 3'},
'test/20220104_sample_4.mp3': {
'year': 2022, 'month': 1, 'day': 4, 'name': 'Sample 4'},
'test/20220105.mp3': {
'year': 2022, 'month': 1, 'day': 5, 'name': '20220105'},
}
subdir_prefix = 'test'
sound_files = {k: r for k, r in (
(path, SoundFile(conf.MEDIA_ROOT + '/' + path))
for path in path_infos.keys()
)}
def test_sound_path(self):
for path, sound_file in self.sound_files.items():
self.assertEqual(path, sound_file.sound_path)
def test_read_path(self):
for path, sound_file in self.sound_files.items():
expected = self.path_infos[path]
result = sound_file.read_path(path)
# remove None values
result = {k: v for k, v in result.items() if v is not None}
self.assertEqual(expected, result, "path: {}".format(path))
def _setup_diff(self, program, info):
episode = models.Episode(program=program, title='test-episode')
at = tz.datetime(**{
k: info[k] for k in ('year', 'month', 'day', 'hour', 'minute')
if info.get(k)
})
at = tz.make_aware(at)
diff = models.Diffusion(episode=episode, start=at,
end=at+timedelta(hours=1))
episode.save()
diff.save()
return diff
def test_find_episode(self):
station = models.Station(name='test-station')
program = models.Program(station=station, title='test')
station.save()
program.save()
for path, sound_file in self.sound_files.items():
infos = sound_file.read_path(path)
diff = self._setup_diff(program, infos)
sound = models.Sound(program=diff.program, file=path)
result = sound_file.find_episode(sound, infos)
self.assertEquals(diff.episode, result)
# TODO: find_playlist, sync

View File

@ -0,0 +1,76 @@
import concurrent.futures as futures
from datetime import datetime, timedelta
import time
from django.test import TestCase
from aircox.management.sound_monitor import \
NotifyHandler, MoveHandler, ModifiedHandler, MonitorHandler
__all__ = ('NotifyHandlerTestCase', 'MoveHandlerTestCase',
'ModifiedHandlerTestCase', 'MonitorHandlerTestCase',)
class FakeEvent:
def __init__(self, **kwargs):
self.__dict__.update(**kwargs)
class WaitHandler(NotifyHandler):
def __call__(self, timeout=0.5, *args, **kwargs):
# using time.sleep make the future done directly, don't know why
start = datetime.now()
while datetime.now() - start < timedelta(seconds=4):
pass
class NotifyHandlerTestCase(TestCase):
pass
class MoveHandlerTestCase(TestCase):
pass
class ModifiedHandlerTestCase(TestCase):
def test_wait(self):
handler = ModifiedHandler()
handler.timeout_delta = timedelta(seconds=0.1)
start = datetime.now()
handler.wait()
delta = datetime.now() - start
self.assertTrue(delta < handler.timeout_delta + timedelta(seconds=0.1))
def test_wait_ping(self):
pool = futures.ThreadPoolExecutor(1)
handler = ModifiedHandler()
handler.timeout_delta = timedelta(seconds=0.5)
future = pool.submit(handler.wait)
time.sleep(0.3)
handler.ping()
time.sleep(0.3)
self.assertTrue(future.running())
class MonitorHandlerTestCase(TestCase):
def setUp(self):
pool = futures.ThreadPoolExecutor(2)
self.monitor = MonitorHandler('archives', pool)
def test_submit_new_job(self):
event = FakeEvent(src_path='dummy_src')
handler = NotifyHandler()
result = self.monitor._submit(handler, event, 'up')
self.assertIs(handler, result)
self.assertIsInstance(handler.future, futures.Future)
self.monitor.pool.shutdown()
def test_submit_job_exists(self):
event = FakeEvent(src_path='dummy_src')
job_1 = self.monitor._submit(WaitHandler(), event, 'up')
job_2 = self.monitor._submit(NotifyHandler(), event, 'up')
self.assertIs(job_1, job_2)
self.monitor.pool.shutdown()