fix tests

This commit is contained in:
bkfox 2024-03-29 01:54:58 +01:00
parent 870c866541
commit bd1b996695
17 changed files with 209 additions and 111 deletions

View File

@ -72,10 +72,10 @@ class SoundFile:
sound.save() sound.save()
if not sound.episodesound_set.all().exists(): if not sound.episodesound_set.all().exists():
self.find_episode_sound(sound) self.create_episode_sound(sound)
return sound return sound
def find_episode_sound(self, sound): def create_episode_sound(self, sound):
episode = sound.find_episode() episode = sound.find_episode()
if episode: if episode:
# FIXME: position from name # FIXME: position from name

View File

@ -1,5 +1,6 @@
import os import os
from django.conf import settings as d_settings
from django.db import models from django.db import models
from django.utils.functional import cached_property from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@ -105,12 +106,11 @@ class EpisodeSoundQuerySet(models.QuerySet):
return self.available().filter(broadcast=True) return self.available().filter(broadcast=True)
def playlist(self, order="position"): def playlist(self, order="position"):
# TODO: subquery expression
if order: if order:
self = self.order_by(order) self = self.order_by(order)
return [ query = self.filter(sound__file__isnull=False, sound__is_removed=False).values_list("sound__file", flat=True)
os.path.join(settings.MEDIA_ROOT, file) return [os.path.join(d_settings.MEDIA_ROOT, file) for file in query]
for file in self.filter(file__isnull=False, is_removed=False).Values_list("file", flat=True)
]
class EpisodeSound(models.Model): class EpisodeSound(models.Model):

View File

@ -91,12 +91,12 @@ def schedule_post_save(sender, instance, created, *args, **kwargs):
def schedule_pre_delete(sender, instance, *args, **kwargs): def schedule_pre_delete(sender, instance, *args, **kwargs):
"""Delete later corresponding diffusion to a changed schedule.""" """Delete later corresponding diffusion to a changed schedule."""
Diffusion.objects.filter(schedule=instance).after(tz.now()).delete() Diffusion.objects.filter(schedule=instance).after(tz.now()).delete()
Episode.objects.filter(diffusion__isnull=True, content__isnull=True, sound__isnull=True).delete() Episode.objects.filter(diffusion__isnull=True, content__isnull=True, episodesound__isnull=True).delete()
@receiver(signals.post_delete, sender=Diffusion) @receiver(signals.post_delete, sender=Diffusion)
def diffusion_post_delete(sender, instance, *args, **kwargs): def diffusion_post_delete(sender, instance, *args, **kwargs):
Episode.objects.filter(diffusion__isnull=True, content__isnull=True, sound__isnull=True).delete() Episode.objects.filter(diffusion__isnull=True, content__isnull=True, episodesound__isnull=True).delete()
@receiver(signals.post_delete, sender=Sound) @receiver(signals.post_delete, sender=Sound)

View File

@ -26,16 +26,11 @@ class SoundQuerySet(FileQuerySet):
"""Return sounds that are archives.""" """Return sounds that are archives."""
return self.filter(broadcast=True) return self.filter(broadcast=True)
def playlist(self, broadcast=True, order_by=True): def playlist(self, order_by="file"):
"""Return files absolute paths as a flat list (exclude sound without """Return files absolute paths as a flat list (exclude sound without
path). path)."""
If `order_by` is True, order by path.
"""
if broadcast:
self = self.broadcast()
if order_by: if order_by:
self = self.order_by("file") self = self.order_by(order_by)
return [ return [
os.path.join(conf.MEDIA_ROOT, file) os.path.join(conf.MEDIA_ROOT, file)
for file in self.filter(file__isnull=False).values_list("file", flat=True) for file in self.filter(file__isnull=False).values_list("file", flat=True)
@ -66,6 +61,8 @@ class Sound(File):
help_text=_("The sound is broadcasted on air"), help_text=_("The sound is broadcasted on air"),
) )
objects = SoundQuerySet.as_manager()
class Meta: class Meta:
verbose_name = _("Sound file") verbose_name = _("Sound file")
verbose_name_plural = _("Sound files") verbose_name_plural = _("Sound files")

View File

@ -131,25 +131,32 @@ def episode(episodes):
@pytest.fixture @pytest.fixture
def podcasts(episodes): def sound(program):
items = [] return baker.make(models.Sound, file="tmp/test.wav", program=program)
for episode in episodes:
sounds = baker.prepare(
models.Sound,
episode=episode,
program=episode.program,
is_public=True,
_quantity=2,
)
for i, sound in enumerate(sounds):
sound.file = f"test_sound_{episode.pk}_{i}.mp3"
items += sounds
return items
@pytest.fixture @pytest.fixture
def sound(program): def sounds(program):
return baker.make(models.Sound, file="tmp/test.wav", program=program) objs = [
models.Sound(program=program, file=f"tmp/test-{i}.wav", broadcast=(i == 0), is_downloadable=(i == 1))
for i in range(0, 3)
]
models.Sound.objects.bulk_create(objs)
return objs
@pytest.fixture
def podcasts(episode, sounds):
objs = [
models.EpisodeSound(
episode=episode,
sound=sound,
broadcast=True,
)
for sound in sounds
]
models.EpisodeSound.objects.bulk_create(objs)
return objs
@pytest.fixture @pytest.fixture

View File

@ -1,14 +1,12 @@
import pytest import pytest
from datetime import timedelta
from django.conf import settings as conf from django.conf import settings as conf
from django.utils import timezone as tz
from aircox import models
from aircox.controllers.sound_file import SoundFile from aircox.controllers.sound_file import SoundFile
# FIXME: use from tests.models.sound
@pytest.fixture @pytest.fixture
def path_infos(): def path_infos():
return { return {
@ -27,6 +25,7 @@ def path_infos():
"day": 2, "day": 2,
"hour": 10, "hour": 10,
"minute": 13, "minute": 13,
"n": None,
"name": "Sample 2", "name": "Sample 2",
}, },
"test/20220103_1_sample_3.mp3": { "test/20220103_1_sample_3.mp3": {
@ -56,42 +55,25 @@ def sound_files(path_infos):
return {k: r for k, r in ((path, SoundFile(conf.MEDIA_ROOT + "/" + path)) for path in path_infos.keys())} return {k: r for k, r in ((path, SoundFile(conf.MEDIA_ROOT + "/" + path)) for path in path_infos.keys())}
@pytest.fixture
def sound_file(sound_files):
return next(sound_files.items())
def test_sound_path(sound_files): def test_sound_path(sound_files):
for path, sound_file in sound_files.items(): for path, sound_file in sound_files.items():
assert path == sound_file.sound_path assert path == sound_file.sound_path
def test_read_path(path_infos, sound_files): class TestSoundFile:
for path, sound_file in sound_files.items(): def sound_path(self, sound_file):
expected = path_infos[path] assert sound_file[0] == sound_file[1].sound_path
result = sound_file.read_path(path)
# remove None values
result = {k: v for k, v in result.items() if v is not None}
assert expected == result, "path: {}".format(path)
def sync(self):
raise NotImplementedError("test is not implemented")
def _setup_diff(program, info): def create_episode_sound(self):
episode = models.Episode(program=program, title="test-episode") raise NotImplementedError("test is not implemented")
at = tz.datetime(**{k: info[k] for k in ("year", "month", "day", "hour", "minute") if info.get(k)})
at = tz.make_aware(at)
diff = models.Diffusion(episode=episode, start=at, end=at + timedelta(hours=1))
episode.save()
diff.save()
return diff
def _on_delete(self):
@pytest.mark.django_db(transaction=True) raise NotImplementedError("test is not implemented")
def test_find_episode(sound_files):
station = models.Station(name="test-station")
program = models.Program(station=station, title="test")
station.save()
program.save()
for path, sound_file in sound_files.items():
infos = sound_file.read_path(path)
diff = _setup_diff(program, infos)
sound = models.Sound(program=diff.program, file=path)
result = sound_file.find_episode(sound, infos)
assert diff.episode == result
# TODO: find_playlist, sync

View File

@ -223,22 +223,19 @@ class TestSoundMonitor:
[ [
(("scan all programs...",), {}), (("scan all programs...",), {}),
] ]
+ [ + [((f"#{program.id} {program.title}",), {}) for program in programs]
((f"#{program.id} {program.title}",), {})
for program in programs
]
) )
assert dirs == [program.abspath for program in programs] assert dirs == [program.abspath for program in programs]
traces = tuple( traces = tuple(
[ [
[ [
( (
(program, settings.SOUND_ARCHIVES_SUBDIR), (program, settings.SOUND_BROADCASTS_SUBDIR),
{"logger": logger, "type": Sound.TYPE_ARCHIVE}, {"logger": logger, "broadcast": True},
), ),
( (
(program, settings.SOUND_EXCERPTS_SUBDIR), (program, settings.SOUND_EXCERPTS_SUBDIR),
{"logger": logger, "type": Sound.TYPE_EXCERPT}, {"logger": logger, "broadcast": False},
), ),
] ]
for program in programs for program in programs
@ -247,6 +244,7 @@ class TestSoundMonitor:
traces_flat = tuple([item for sublist in traces for item in sublist]) traces_flat = tuple([item for sublist in traces for item in sublist])
assert interface._traces("scan_for_program") == traces_flat assert interface._traces("scan_for_program") == traces_flat
# TODO / FIXME
def broken_test_monitor(self, monitor, monitor_interfaces, logger): def broken_test_monitor(self, monitor, monitor_interfaces, logger):
def sleep(*args, **kwargs): def sleep(*args, **kwargs):
monitor.stop() monitor.stop()
@ -260,6 +258,7 @@ class TestSoundMonitor:
assert observer assert observer
schedules = observer._traces("schedule") schedules = observer._traces("schedule")
for (handler, *_), kwargs in schedules: for (handler, *_), kwargs in schedules:
breakpoint()
assert isinstance(handler, sound_monitor.MonitorHandler) assert isinstance(handler, sound_monitor.MonitorHandler)
assert isinstance(handler.pool, futures.ThreadPoolExecutor) assert isinstance(handler.pool, futures.ThreadPoolExecutor)
assert (handler.subdir, handler.type) in ( assert (handler.subdir, handler.type) in (

View File

@ -0,0 +1,122 @@
from datetime import timedelta
import os
import pytest
from django.conf import settings
from django.utils import timezone as tz
from aircox import models
@pytest.fixture
def path_infos():
return {
"test/20220101_10h13_1_sample_1.mp3": {
"year": 2022,
"month": 1,
"day": 1,
"hour": 10,
"minute": 13,
"n": 1,
"name": "Sample 1",
},
"test/20220102_10h13_sample_2.mp3": {
"year": 2022,
"month": 1,
"day": 2,
"hour": 10,
"minute": 13,
"n": None,
"name": "Sample 2",
},
"test/20220103_1_sample_3.mp3": {
"year": 2022,
"month": 1,
"day": 3,
"hour": None,
"minute": None,
"n": 1,
"name": "Sample 3",
},
"test/20220104_sample_4.mp3": {
"year": 2022,
"month": 1,
"day": 4,
"hour": None,
"minute": None,
"n": None,
"name": "Sample 4",
},
"test/20220105.mp3": {
"year": 2022,
"month": 1,
"day": 5,
"hour": None,
"minute": None,
"n": None,
"name": "20220105",
},
}
class TestSoundQuerySet:
@pytest.mark.django_db
def test_downloadable(self, sounds):
query = models.Sound.objects.downloadable().values_list("is_downloadable", flat=True)
assert set(query) == {True}
@pytest.mark.django_db
def test_broadcast(self, sounds):
query = models.Sound.objects.broadcast().values_list("broadcast", flat=True)
assert set(query) == {True}
@pytest.mark.django_db
def test_playlist(self, sounds):
expected = [os.path.join(settings.MEDIA_ROOT, s.file.path) for s in sounds]
assert models.Sound.objects.all().playlist() == expected
class TestSound:
@pytest.mark.django_db
def test_read_path(self, path_infos):
for path, expected in path_infos.items():
result = models.Sound.read_path(path)
assert expected == result
@pytest.mark.django_db
def test__as_name(self):
name = "some_1_file"
assert models.Sound._as_name(name) == "Some 1 File"
def _setup_diff(self, program, info):
episode = models.Episode(program=program, title="test-episode")
at = tz.datetime(**{k: info[k] for k in ("year", "month", "day", "hour", "minute") if info.get(k)})
at = tz.make_aware(at)
diff = models.Diffusion(episode=episode, start=at, end=at + timedelta(hours=1))
episode.save()
diff.save()
return diff
@pytest.mark.django_db(transaction=True)
def test_find_episode(self, program, path_infos):
for path, infos in path_infos.items():
diff = self._setup_diff(program, infos)
sound = models.Sound(program=diff.program, file=path)
result = sound.find_episode(infos)
assert diff.episode == result
@pytest.mark.django_db
def test_find_playlist(self):
raise NotImplementedError("test is not implemented")
@pytest.mark.django_db
def test_get_upload_dir(self):
raise NotImplementedError("test is not implemented")
@pytest.mark.django_db
def test_sync_fs(self):
raise NotImplementedError("test is not implemented")
@pytest.mark.django_db
def test_read_metadata(self):
raise NotImplementedError("test is not implemented")

View File

@ -1,10 +1,8 @@
# FIXME: this should be cleaner
from itertools import chain from itertools import chain
import json import json
import pytest import pytest
from django.urls import reverse from django.urls import reverse
from django.core.files.uploadedfile import SimpleUploadedFile
from aircox.models import Program
@pytest.mark.django_db() @pytest.mark.django_db()
@ -22,20 +20,6 @@ def test_edit_program(user, client, program):
assert b"foobar" in response.content assert b"foobar" in response.content
@pytest.mark.django_db()
def test_add_cover(user, client, program, png_content):
assert program.cover is None
user.groups.add(program.editors)
client.force_login(user)
cover = SimpleUploadedFile("cover1.png", png_content, content_type="image/png")
r = client.post(
reverse("program-edit", kwargs={"pk": program.pk}), {"content": "foobar", "new_cover": cover}, follow=True
)
assert r.status_code == 200
p = Program.objects.get(pk=program.pk)
assert "cover1.png" in p.cover.url
@pytest.mark.django_db() @pytest.mark.django_db()
def test_edit_tracklist(user, client, program, episode, tracks): def test_edit_tracklist(user, client, program, episode, tracks):
user.groups.add(program.editors) user.groups.add(program.editors)

View File

@ -11,6 +11,9 @@ class FakeView:
def ___init__(self): def ___init__(self):
self.kwargs = {} self.kwargs = {}
def dispatch(self, *args, **kwargs):
pass
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
pass pass

View File

@ -43,7 +43,6 @@ class TestBaseView:
"view": base_view, "view": base_view,
"station": station, "station": station,
"page": None, # get_page() returns None "page": None, # get_page() returns None
"audio_streams": station.streams,
"model": base_view.model, "model": base_view.model,
} }

View File

@ -40,7 +40,7 @@ def parent_mixin():
@pytest.fixture @pytest.fixture
def attach_mixin(): def attach_mixin():
class Mixin(mixins.AttachedToMixin, FakeView): class Mixin(mixins.AttachedToMixin, FakeView):
attach_to_value = models.StaticPage.ATTACH_TO_HOME attach_to_value = models.StaticPage.Target.HOME
return Mixin() return Mixin()
@ -105,10 +105,10 @@ class TestParentMixin:
def test_get_parent_not_parent_url_kwargs(self, parent_mixin): def test_get_parent_not_parent_url_kwargs(self, parent_mixin):
assert parent_mixin.get_parent(self.req) is None assert parent_mixin.get_parent(self.req) is None
def test_get_calls_parent(self, parent_mixin): def test_dispatch_calls_parent(self, parent_mixin):
parent = "parent object" parent = "parent object"
parent_mixin.get_parent = lambda *_, **kw: parent parent_mixin.get_parent = lambda *_, **kw: parent
parent_mixin.get(self.req) parent_mixin.dispatch(self.req)
assert parent_mixin.parent == parent assert parent_mixin.parent == parent
@pytest.mark.django_db @pytest.mark.django_db
@ -120,7 +120,7 @@ class TestParentMixin:
assert set(query) == episodes_id assert set(query) == episodes_id
def test_get_context_data_with_parent(self, parent_mixin): def test_get_context_data_with_parent(self, parent_mixin):
parent_mixin.parent = Interface(cover="parent-cover") parent_mixin.parent = Interface(cover=Interface(url="parent-cover"))
context = parent_mixin.get_context_data() context = parent_mixin.get_context_data()
assert context["cover"] == "parent-cover" assert context["cover"] == "parent-cover"

View File

@ -7,7 +7,6 @@ __all__ = ("BaseView", "BaseAPIView")
class BaseView(TemplateResponseMixin, ContextMixin): class BaseView(TemplateResponseMixin, ContextMixin):
header_template_name = "aircox/widgets/header.html"
related_count = 4 related_count = 4
related_carousel_count = 8 related_carousel_count = 8
@ -50,8 +49,8 @@ class BaseView(TemplateResponseMixin, ContextMixin):
return None return None
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
kwargs.setdefault("station", self.station)
kwargs.setdefault("page", self.get_page()) kwargs.setdefault("page", self.get_page())
kwargs.setdefault("header_template_name", self.header_template_name)
if "model" not in kwargs: if "model" not in kwargs:
model = getattr(self, "model", None) or hasattr(self, "object") and type(self.object) model = getattr(self, "model", None) or hasattr(self, "object") and type(self.object)

View File

@ -133,8 +133,10 @@ class Monitor:
# get sound # get sound
diff = None diff = None
sound = Sound.objects.path(air_uri).first() sound = Sound.objects.path(air_uri).first()
if sound and sound.episode_id is not None: if sound:
diff = Diffusion.objects.episode(id=sound.episode_id).on_air().now(air_time).first() ids = sound.episodesound_set.values_list("episode_id", flat=True)
if ids:
diff = Diffusion.objects.filter(episode_id__in=ids).on_air().now(air_time).first()
# log sound on air # log sound on air
return self.log( return self.log(

View File

@ -146,24 +146,28 @@ def episode(program):
def sound(program, episode): def sound(program, episode):
sound = models.Sound( sound = models.Sound(
program=program, program=program,
episode=episode,
name="sound", name="sound",
type=models.Sound.TYPE_ARCHIVE, broadcast=True,
position=0,
file="sound.mp3", file="sound.mp3",
) )
sound.save(check=False) sound.save(sync=False)
return sound return sound
@pytest.fixture
def episode_sound(episode, sound):
obj = models.EpisodeSound(episode=episode, sound=sound, position=0, broadcast=sound.broadcast)
obj.save()
return obj
@pytest.fixture @pytest.fixture
def sounds(program): def sounds(program):
items = [ items = [
models.Sound( models.Sound(
name=f"sound {i}", name=f"sound {i}",
program=program, program=program,
type=models.Sound.TYPE_ARCHIVE, broadcast=True,
position=i,
file=f"sound-{i}.mp3", file=f"sound-{i}.mp3",
) )
for i in range(0, 3) for i in range(0, 3)

View File

@ -20,7 +20,7 @@ def monitor(streamer):
@pytest.fixture @pytest.fixture
def diffusion(program, episode, sound): def diffusion(program, episode, episode_sound):
return baker.make( return baker.make(
models.Diffusion, models.Diffusion,
program=program, program=program,
@ -33,10 +33,10 @@ def diffusion(program, episode, sound):
@pytest.fixture @pytest.fixture
def source(monitor, streamer, sound, diffusion): def source(monitor, streamer, episode_sound, diffusion):
source = next(monitor.streamer.playlists) source = next(monitor.streamer.playlists)
source.uri = sound.file.path source.uri = episode_sound.sound.file.path
source.episode_id = sound.episode_id source.episode_id = episode_sound.episode_id
source.air_time = diffusion.start + tz.timedelta(seconds=10) source.air_time = diffusion.start + tz.timedelta(seconds=10)
return source return source
@ -185,7 +185,7 @@ class TestMonitor:
monitor.trace_tracks(log) monitor.trace_tracks(log)
@pytest.mark.django_db(transaction=True) @pytest.mark.django_db(transaction=True)
def test_handle_diffusions(self, monitor, streamer, diffusion, sound): def test_handle_diffusions(self, monitor, streamer, diffusion, episode_sound):
interface( interface(
monitor, monitor,
{ {

View File

@ -67,7 +67,7 @@ class TestPlaylistSource:
@pytest.mark.django_db @pytest.mark.django_db
def test_get_sound_queryset(self, playlist_source, sounds): def test_get_sound_queryset(self, playlist_source, sounds):
query = playlist_source.get_sound_queryset() query = playlist_source.get_sound_queryset()
assert all(r.program_id == playlist_source.program.pk and r.type == r.TYPE_ARCHIVE for r in query) assert all(r.program_id == playlist_source.program.pk and r.broadcast for r in query)
@pytest.mark.django_db @pytest.mark.django_db
def test_get_playlist(self, playlist_source, sounds): def test_get_playlist(self, playlist_source, sounds):