From b2f4bce7179ff2e10efcf830fe84fd28e29e6c96 Mon Sep 17 00:00:00 2001 From: bkfox Date: Thu, 22 Jun 2023 23:29:51 +0200 Subject: [PATCH] tests: playlist_import, log_archiver, sound_stats.SoxStats --- aircox/controllers/log_archiver.py | 14 +- aircox/controllers/sound_stats.py | 44 ++++--- aircox/test.py | 23 +++- aircox/tests/conftest.py | 16 +++ aircox/tests/controllers/playlist.csv | 3 + aircox/tests/controllers/test_log_archiver.py | 123 ++++++++++++++---- .../tests/controllers/test_playlist_import.py | 66 ++++++++-- aircox/tests/controllers/test_sound_stats.py | 75 ++++++++--- 8 files changed, 282 insertions(+), 82 deletions(-) create mode 100644 aircox/tests/controllers/playlist.csv diff --git a/aircox/controllers/log_archiver.py b/aircox/controllers/log_archiver.py index 9984350..18a388e 100644 --- a/aircox/controllers/log_archiver.py +++ b/aircox/controllers/log_archiver.py @@ -42,6 +42,7 @@ class LogArchiver: # exists yet <3 for (station, date), logs in logs.items(): path = self.get_path(station, date) + # FIXME: remove binary mode with gzip.open(path, "ab") as archive: data = yaml.dump( [self.serialize(line) for line in logs] @@ -60,11 +61,8 @@ class LogArchiver: qs = qs.order_by("date") logs = {} for log in qs: - key = (log.station, log.date) - if key not in logs: - logs[key] = [log] - else: - logs[key].append(log) + key = (log.station, log.date.date()) + logs.setdefault(key, []).append(log) return logs def serialize(self, log): @@ -73,13 +71,13 @@ class LogArchiver: def load(self, station, date): """Load an archive returning logs in a list.""" - from aircox.models import Log - path = self.get_path(station, date) if not os.path.exists(path): return [] + return self.load_file(path) + def load_file(self, path): with gzip.open(path, "rb") as archive: data = archive.read() logs = yaml.load(data) @@ -110,5 +108,5 @@ class LogArchiver: """From a list of dict representing logs, retrieve related objects of the given type.""" attr_id = attr + "_id" - pks = (log[attr_id] for log in logs if attr_id in log) + pks = {log[attr_id] for log in logs if attr_id in log} return {rel.pk: rel for rel in model.objects.filter(pk__in=pks)} diff --git a/aircox/controllers/sound_stats.py b/aircox/controllers/sound_stats.py index 4bad14f..a4e5d50 100644 --- a/aircox/controllers/sound_stats.py +++ b/aircox/controllers/sound_stats.py @@ -24,16 +24,30 @@ class SoxStats: "Length s", ] - def __init__(self, path, **kwargs): + values = None + + def __init__(self, path=None, **kwargs): """If path is given, call analyse with path and kwargs.""" - self.values = {} if path: self.analyse(path, **kwargs) - def get(self, attr): - return self.values.get(attr) + def analyse(self, path, at=None, length=None): + """If at and length are given use them as excerpt to analyse.""" + args = ["sox", path, "-n"] + if at is not None and length is not None: + args += ["trim", str(at), str(length)] + args.append("stats") + + p = subprocess.Popen( + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + # sox outputs to stderr (my god WHYYYY) + out_, out = p.communicate() + self.values = self.parse(str(out, encoding="utf-8")) def parse(self, output): + """Parse sox output, settubg values from it.""" + values = {} for attr in self.attributes: value = re.search(attr + r"\s+(?P\S+)", output) value = value and value.groupdict() @@ -42,24 +56,12 @@ class SoxStats: value = float(value.get("value")) except ValueError: value = None - self.values[attr] = value - self.values["length"] = self.values["Length s"] + values[attr] = value + values["length"] = values.pop("Length s", None) + return values - def analyse(self, path, at=None, length=None): - """If at and length are given use them as excerpt to analyse.""" - args = ["sox", path, "-n"] - - if at is not None and length is not None: - args += ["trim", str(at), str(length)] - - args.append("stats") - - p = subprocess.Popen( - args, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - # sox outputs to stderr (my god WHYYYY) - out_, out = p.communicate() - self.parse(str(out, encoding="utf-8")) + def get(self, attr): + return self.values.get(attr) class SoundStats: diff --git a/aircox/test.py b/aircox/test.py index d5c61b0..070220c 100644 --- a/aircox/test.py +++ b/aircox/test.py @@ -3,7 +3,7 @@ from collections import namedtuple import inspect -__all__ = ("interface", "Interface") +__all__ = ("interface", "Interface", "File") def interface(obj, funcs): @@ -233,6 +233,7 @@ class Interface: def _irelease(self): """Shortcut to `self._imeta.release`.""" self._imeta.release() + self._imeta.reset() def _trace(self, *args, **kw): """Shortcut to `self._imeta.get_trace`.""" @@ -266,3 +267,23 @@ class Interface: def __str__(self): iface = super().__str__() return f"{iface}::{self._imeta.target}" + + +class File: + def __init__(self, data=""): + self.data = data + + def read(self): + return self.data + + def write(self, data): + self.data += data + + def close(self): + self.data = None + + def __enter__(self): + return self + + def __exit__(self, *_, **__): + pass diff --git a/aircox/tests/conftest.py b/aircox/tests/conftest.py index 37342cf..b74bfcd 100644 --- a/aircox/tests/conftest.py +++ b/aircox/tests/conftest.py @@ -115,3 +115,19 @@ def podcasts(episodes): @pytest.fixture def sound(program): return baker.make(models.Sound, file="tmp/test.wav", program=program) + + +@pytest.fixture +def tracks(episode, sound): + items = [ + baker.prepare( + models.Track, episode=episode, position=i, timestamp=i * 60 + ) + for i in range(0, 3) + ] + items += [ + baker.prepare(models.Track, sound=sound, position=i, timestamp=i * 60) + for i in range(0, 3) + ] + models.Track.objects.bulk_create(items) + return items diff --git a/aircox/tests/controllers/playlist.csv b/aircox/tests/controllers/playlist.csv new file mode 100644 index 0000000..0cfd3c4 --- /dev/null +++ b/aircox/tests/controllers/playlist.csv @@ -0,0 +1,3 @@ +Artist 1;Title 1;1;0;tag1,tag12;info1 +Artist 2;Title 2;2;1;tag2,tag12;info2 +Artist 3;Title 3;3;2;; diff --git a/aircox/tests/controllers/test_log_archiver.py b/aircox/tests/controllers/test_log_archiver.py index dd9d1ff..bc74b0e 100644 --- a/aircox/tests/controllers/test_log_archiver.py +++ b/aircox/tests/controllers/test_log_archiver.py @@ -1,37 +1,110 @@ -import pytest +from django.utils import timezone as tz -from aircox.controllers.log_archiver import LogArchiver +import pytest +from model_bakery import baker + +from aircox import models +from aircox.test import Interface, File +from aircox.controllers import log_archiver @pytest.fixture -def log_archiver(): - return LogArchiver() +def diffusions(episodes): + items = [ + baker.prepare( + models.Diffusion, + program=episode.program, + episode=episode, + type=models.Diffusion.TYPE_ON_AIR, + ) + for episode in episodes + ] + models.Diffusion.objects.bulk_create(items) + return items + + +@pytest.fixture +def logs(diffusions, sound, tracks): + now = tz.now() + station = diffusions[0].program.station + items = [ + models.Log( + station=diffusion.program.station, + type=models.Log.TYPE_START, + date=now + tz.timedelta(hours=-10, minutes=i), + source="13", + diffusion=diffusion, + ) + for i, diffusion in enumerate(diffusions) + ] + items += [ + models.Log( + station=station, + type=models.Log.TYPE_ON_AIR, + date=now + tz.timedelta(hours=-9, minutes=i), + source="14", + track=track, + sound=track.sound, + ) + for i, track in enumerate(tracks) + ] + models.Log.objects.bulk_create(items) + return items + + +@pytest.fixture +def logs_qs(logs): + return models.Log.objects.filter(pk__in=(r.pk for r in logs)) + + +@pytest.fixture +def file(): + return File(data=b"") + + +@pytest.fixture +def gzip(file): + gzip = Interface.inject(log_archiver, "gzip", {"open": file}) + yield gzip + gzip._irelease() + + +@pytest.fixture +def archiver(): + return log_archiver.LogArchiver() class TestLogArchiver: - def test_get_path(self): - pass + @pytest.mark.django_db + def test_archive_then_load_file(self, archiver, file, gzip, logs, logs_qs): + # before logs are deleted from db, get data + sorted = archiver.sort_logs(logs_qs) + paths = { + archiver.get_path(station, date) for station, date in sorted.keys() + } - def test_archive(self): - pass + count = archiver.archive(logs_qs, keep=False) + assert count == len(logs) + assert not logs_qs.count() + assert all( + path in paths for path, *_ in gzip._traces("open", args=True) + ) - def test_archive_no_qs(self): - pass + results = archiver.load_file("dummy path") + assert results - def test_archive_not_keep(self): - pass + @pytest.mark.django_db + def test_archive_no_qs(self, archiver): + count = archiver.archive(models.Log.objects.none()) + assert not count - def test_sort_log(self): - pass + @pytest.mark.django_db + def test_sort_log(self, archiver, logs_qs): + sorted = archiver.sort_logs(logs_qs) - def test_serialize(self): - pass - - def test_load(self): - pass - - def test_load_file_not_exists(self): - pass - - def test_get_relations(self): - pass + assert sorted + for (station, date), logs in sorted.items(): + assert all( + log.station == station and log.date.date() == date + for log in logs + ) diff --git a/aircox/tests/controllers/test_playlist_import.py b/aircox/tests/controllers/test_playlist_import.py index 257272f..5c102d1 100644 --- a/aircox/tests/controllers/test_playlist_import.py +++ b/aircox/tests/controllers/test_playlist_import.py @@ -1,22 +1,64 @@ +import os import pytest -from aircox.controller.playlist_import import PlaylistImport +from aircox.test import Interface +from aircox.controllers import playlist_import + + +csv_data = [ + { + "artist": "Artist 1", + "title": "Title 1", + "minutes": "1", + "seconds": "0", + "tags": "tag1,tag12", + "info": "info1", + }, + { + "artist": "Artist 2", + "title": "Title 2", + "minutes": "2", + "seconds": "1", + "tags": "tag2,tag12", + "info": "info2", + }, + { + "artist": "Artist 3", + "title": "Title 3", + "minutes": "3", + "seconds": "2", + "tags": "", + "info": "", + }, +] @pytest.fixture -def playlist_import(): - return PlaylistImport() +def importer(sound): + path = os.path.join(os.path.dirname(__file__), "playlist.csv") + return playlist_import.PlaylistImport(path, sound=sound) class TestPlaylistImport: - def test_reset(self): - pass + @pytest.mark.django_db + def test_run(self, importer): + iface = Interface(None, {"read": None, "make_playlist": None}) + importer.read = iface.read + importer.make_playlist = iface.make_playlist + importer.run() + assert iface._trace("read") + assert iface._trace("make_playlist") - def test_run(self): - pass + @pytest.mark.django_db + def test_read(self, importer): + importer.read() + assert importer.data == csv_data - def test_read(self): - pass - - def make_playlist(self): - pass + @pytest.mark.django_db + def test_make_playlist(self, importer, sound): + importer.data = csv_data + importer.make_playlist() + track_artists = sound.track_set.all().values_list("artist", flat=True) + csv_artists = {r["artist"] for r in csv_data} + assert set(track_artists) == csv_artists + # TODO: check other values diff --git a/aircox/tests/controllers/test_sound_stats.py b/aircox/tests/controllers/test_sound_stats.py index feacae1..9f75487 100644 --- a/aircox/tests/controllers/test_sound_stats.py +++ b/aircox/tests/controllers/test_sound_stats.py @@ -1,30 +1,75 @@ +import subprocess + import pytest -from aircox.controllers.sound_stats import SoxStats, SoundStats +from aircox.test import Interface +from aircox.controllers import sound_stats + + +sox_output = """ + DC offset 0.000000\n + Min level 0.000000\n + Max level 0.000000\n + Pk lev dB -inf\n + RMS lev dB -inf\n + RMS Pk dB -inf\n + RMS Tr dB -inf\n + Crest factor 1.00\n + Flat factor 179.37\n + Pk count 1.86G\n + Bit-depth 0/0\n + Num samples 930M\n + Length s 19383.312\n + Scale max 1.000000\n + Window s 0.050\n +""" +sox_values = { + "DC offset": 0.0, + "Min level": 0.0, + "Max level": 0.0, + "Pk lev dB": float("-inf"), + "RMS lev dB": float("-inf"), + "RMS Pk dB": float("-inf"), + "RMS Tr dB": float("-inf"), + "Flat factor": 179.37, + "length": 19383.312, +} @pytest.fixture -def sox_stats(): - return SoxStats() +def sox_interfaces(): + process = Interface( + None, {"communicate": ("", sox_output.encode("utf-8"))} + ) + subprocess = Interface.inject( + sound_stats, "subprocess", {"Popen": lambda *_, **__: process} + ) + yield {"process": process, "subprocess": subprocess} + subprocess._irelease() @pytest.fixture -def sound_stats(): - return SoundStats() +def sox_stats(sox_interfaces): + return sound_stats.SoxStats() + + +@pytest.fixture +def stats(): + return sound_stats.SoundStats() class TestSoxStats: - def test___init__(self): - pass + def test_parse(self, sox_stats): + values = sox_stats.parse(sox_output) + assert values == sox_values - def test_get(self): - pass - - def test_parse(self): - pass - - def test_analyse(self): - pass + def test_analyse(self, sox_stats, sox_interfaces): + sox_stats.analyse("fake_path", 1, 2) + assert sox_interfaces["subprocess"]._trace("Popen") == ( + (["sox", "fake_path", "-n", "trim", "1", "2", "stats"],), + {"stdout": subprocess.PIPE, "stderr": subprocess.PIPE}, + ) + assert sox_stats.values == sox_values class TestSoundStats: