Compare commits

..

2 Commits

Author SHA1 Message Date
bkfox
fed076348b finish writing controllers.monitor tests 2023-06-18 16:56:34 +02:00
bkfox
a1ca0e70cf add aircox.test utilities 2023-06-18 16:07:09 +02:00
5 changed files with 394 additions and 39 deletions

33
aircox/test.py Normal file
View File

@ -0,0 +1,33 @@
"""This module provide test utilities."""
__all__ = ("interface",)
def interface(obj, funcs):
"""Override provided object's functions using dict of funcs, as
``{func_name: return_value}``.
Attribute ``obj.calls`` is a dict with all call done using those
methods, as ``{func_name: (args, kwargs) | list[(args, kwargs]]}``.
"""
if not isinstance(getattr(obj, "calls", None), dict):
obj.calls = {}
for attr, value in funcs.items():
interface_wrap(obj, attr, value)
def interface_wrap(obj, attr, value):
obj.calls[attr] = None
def wrapper(*a, **kw):
call = obj.calls.get(attr)
if call is None:
obj.calls[attr] = (a, kw)
elif isinstance(call, tuple):
obj.calls[attr] = [call, (a, kw)]
else:
call.append((a, kw))
return value
setattr(obj, attr, wrapper)

View File

@ -5,14 +5,10 @@
# x when liquidsoap fails to start/exists: exit # x when liquidsoap fails to start/exists: exit
# - handle restart after failure # - handle restart after failure
# - is stream restart after live ok? # - is stream restart after live ok?
import pytz
from django.utils import timezone as tz from django.utils import timezone as tz
from aircox.models import Diffusion, Log, Sound, Track from aircox.models import Diffusion, Log, Sound, Track
# force using UTC
tz.activate(pytz.UTC)
class Monitor: class Monitor:
"""Log and launch diffusions for the given station. """Log and launch diffusions for the given station.
@ -33,9 +29,9 @@ class Monitor:
""" Timedelta: minimal delay between two call of monitor. """ """ Timedelta: minimal delay between two call of monitor. """
logs = None logs = None
"""Queryset to station's logs (ordered by -pk)""" """Queryset to station's logs (ordered by -pk)"""
cancel_timeout = 20 cancel_timeout = tz.timedelta(minutes=20)
"""Timeout in minutes before cancelling a diffusion.""" """Timeout in minutes before cancelling a diffusion."""
sync_timeout = 5 sync_timeout = tz.timedelta(minutes=5)
"""Timeout in minutes between two streamer's sync.""" """Timeout in minutes between two streamer's sync."""
sync_next = None sync_next = None
"""Datetime of the next sync.""" """Datetime of the next sync."""
@ -56,11 +52,10 @@ class Monitor:
"""Log of last triggered item (sound or diffusion).""" """Log of last triggered item (sound or diffusion)."""
return self.logs.start().with_diff().first() return self.logs.start().with_diff().first()
def __init__(self, streamer, delay, cancel_timeout, **kwargs): def __init__(self, streamer, delay, **kwargs):
self.streamer = streamer self.streamer = streamer
# adding time ensure all calculation have a margin # adding time ensures all calculations have a margin
self.delay = delay + tz.timedelta(seconds=5) self.delay = delay + tz.timedelta(seconds=5)
self.cancel_timeout = cancel_timeout
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
self.logs = self.get_logs_queryset() self.logs = self.get_logs_queryset()
self.init_last_sound_logs() self.init_last_sound_logs()
@ -71,7 +66,7 @@ class Monitor:
"diffusion", "sound", "track" "diffusion", "sound", "track"
).order_by("-pk") ).order_by("-pk")
def init_last_sound_logs(self, key=None): def init_last_sound_logs(self):
"""Retrieve last logs and initialize `last_sound_logs`""" """Retrieve last logs and initialize `last_sound_logs`"""
logs = {} logs = {}
for source in self.streamer.sources: for source in self.streamer.sources:
@ -117,18 +112,6 @@ class Monitor:
self.handle_diffusions() self.handle_diffusions()
self.sync() self.sync()
def log(self, source, **kwargs):
"""Create a log using **kwargs, and print info."""
kwargs.setdefault("station", self.station)
kwargs.setdefault("date", tz.now())
log = Log(source=source, **kwargs)
log.save()
log.print()
if log.sound:
self.last_sound_logs[source] = log
return log
def trace_sound(self, source): def trace_sound(self, source):
"""Return on air sound log (create if not present).""" """Return on air sound log (create if not present)."""
air_uri, air_time = source.uri, source.air_time air_uri, air_time = source.uri, source.air_time
@ -176,7 +159,7 @@ class Monitor:
return return
tracks = Track.objects.filter( tracks = Track.objects.filter(
sound__id=log.sound_id, timestamp__isnull=False sound_id=log.sound_id, timestamp__isnull=False
).order_by("timestamp") ).order_by("timestamp")
if not tracks.exists(): if not tracks.exists():
return return
@ -186,15 +169,14 @@ class Monitor:
now = tz.now() now = tz.now()
for track in tracks: for track in tracks:
pos = log.date + tz.timedelta(seconds=track.timestamp) pos = log.date + tz.timedelta(seconds=track.timestamp)
if pos > now: if pos <= now:
break self.log(
self.log( type=Log.TYPE_ON_AIR,
type=Log.TYPE_ON_AIR, date=pos,
date=pos, source=log.source,
source=log.source, track=track,
track=track, comment=track,
comment=track, )
)
def handle_diffusions(self): def handle_diffusions(self):
"""Handle scheduled diffusion, trigger if needed, preload playlists and """Handle scheduled diffusion, trigger if needed, preload playlists and
@ -246,6 +228,18 @@ class Monitor:
if diff.start < now - self.cancel_timeout: if diff.start < now - self.cancel_timeout:
self.cancel_diff(dealer, diff) self.cancel_diff(dealer, diff)
def log(self, source, **kwargs):
"""Create a log using **kwargs, and print info."""
kwargs.setdefault("station", self.station)
kwargs.setdefault("date", tz.now())
log = Log(source=source, **kwargs)
log.save()
log.print()
if log.sound:
self.last_sound_logs[source] = log
return log
def start_diff(self, source, diff): def start_diff(self, source, diff):
playlist = Sound.objects.episode(id=diff.episode_id).playlist() playlist = Sound.objects.episode(id=diff.episode_id).playlist()
source.push(*playlist) source.push(*playlist)
@ -272,7 +266,7 @@ class Monitor:
if self.sync_next is not None and now < self.sync_next: if self.sync_next is not None and now < self.sync_next:
return return
self.sync_next = now + tz.timedelta(minutes=self.sync_timeout) self.sync_next = now + self.sync_timeout
for source in self.streamer.playlists: for source in self.streamer.playlists:
source.sync() source.sync()

View File

@ -17,6 +17,7 @@ from django.utils import timezone as tz
from aircox.models import Station from aircox.models import Station
from aircox_streamer.controllers import Monitor, Streamer from aircox_streamer.controllers import Monitor, Streamer
# force using UTC # force using UTC
tz.activate(pytz.UTC) tz.activate(pytz.UTC)
@ -68,7 +69,7 @@ class Command(BaseCommand):
"-t", "-t",
"--timeout", "--timeout",
type=float, type=float,
default=Monitor.cancel_timeout, default=Monitor.cancel_timeout.total_seconds() / 60,
help="time to wait in MINUTES before canceling a diffusion that " help="time to wait in MINUTES before canceling a diffusion that "
"should have ran but did not. ", "should have ran but did not. ",
) )
@ -106,7 +107,8 @@ class Command(BaseCommand):
delay = tz.timedelta(milliseconds=delay) delay = tz.timedelta(milliseconds=delay)
timeout = tz.timedelta(minutes=timeout) timeout = tz.timedelta(minutes=timeout)
monitors = [ monitors = [
Monitor(streamer, delay, timeout) for streamer in streamers Monitor(streamer, delay, cancel_timeout=timeout)
for streamer in streamers
] ]
while not run or streamer.is_running: while not run or streamer.is_running:

View File

@ -5,6 +5,7 @@ from datetime import datetime, time
import tzlocal import tzlocal
import pytest import pytest
from model_bakery import baker
from aircox import models from aircox import models
from aircox_streamer import controllers from aircox_streamer import controllers
@ -17,6 +18,36 @@ local_tz = tzlocal.get_localzone()
working_dir = os.path.join(os.path.dirname(__file__), "working_dir") working_dir = os.path.join(os.path.dirname(__file__), "working_dir")
def interface_wrap(obj, attr, value):
if not isinstance(getattr(obj, "calls", None), dict):
obj.calls = {}
obj.calls[attr] = None
def wrapper(*a, **kw):
call = obj.calls.get(attr)
if call is None:
obj.calls[attr] = (a, kw)
elif isinstance(call, tuple):
obj.calls[attr] = [call, (a, kw)]
else:
call.append((a, kw))
return value
setattr(obj, attr, wrapper)
def interface(obj, funcs):
"""Override provided object's functions using dict of funcs, as ``{
func_name: return_value}``.
Attribute ``obj.calls`` is a dict
with all call done using those methods, as
``{func_name: (args, kwargs)}``.
"""
for attr, value in funcs.items():
interface_wrap(obj, attr, value)
class FakeSocket: class FakeSocket:
FAILING_ADDRESS = -1 FAILING_ADDRESS = -1
"""Connect with this address fails.""" """Connect with this address fails."""
@ -142,6 +173,25 @@ def stream(program):
return stream return stream
@pytest.fixture
def episode(program):
return baker.make(models.Episode, title="test episode", program=program)
@pytest.fixture
def sound(program, episode):
sound = models.Sound(
program=program,
episode=episode,
name="sound",
type=models.Sound.TYPE_ARCHIVE,
position=0,
file="sound.mp3",
)
sound.save(check=False)
return sound
@pytest.fixture @pytest.fixture
def sounds(program): def sounds(program):
items = [ items = [
@ -218,6 +268,10 @@ def metadata_string(metadata_data):
# -- streamers # -- streamers
class FakeStreamer(controllers.Streamer): class FakeStreamer(controllers.Streamer):
calls = {} calls = {}
is_ready = False
def __init__(self, **kwargs):
self.__dict__.update(**kwargs)
def fetch(self): def fetch(self):
self.calls["fetch"] = True self.calls["fetch"] = True
@ -236,7 +290,7 @@ class FakeSource(controllers.Source):
def sync(self): def sync(self):
self.calls["sync"] = True self.calls["sync"] = True
def push(self, path): def push(self, *path):
self.calls["push"] = path self.calls["push"] = path
return path return path
@ -250,13 +304,34 @@ class FakeSource(controllers.Source):
self.calls["seek"] = c self.calls["seek"] = c
class FakePlaylist(FakeSource, controllers.PlaylistSource):
pass
class FakeQueueSource(FakeSource, controllers.QueueSource):
pass
@pytest.fixture
def streamer(station, station_ports):
streamer = FakeStreamer(station=station)
streamer.sources = [
FakePlaylist(i, uri=f"source-{i}") for i in range(0, 3)
]
streamer.sources.append(FakeQueueSource(len(streamer.sources)))
return streamer
@pytest.fixture @pytest.fixture
def streamers(stations, stations_ports): def streamers(stations, stations_ports):
streamers = controllers.Streamers(streamer_class=FakeStreamer) streamers = controllers.Streamers(streamer_class=FakeStreamer)
# avoid unecessary db calls # avoid unecessary db calls
streamers.streamers = { streamers.streamers = {
station.pk: FakeStreamer(station) for station in stations station.pk: FakeStreamer(station=station) for station in stations
} }
for streamer in streamers.values(): for j, streamer in enumerate(streamers.values()):
streamer.sources = [FakeSource(i) for i in range(0, 3)] streamer.sources = [
FakePlaylist(i, uri=f"source-{j}-{i}") for i in range(0, 3)
]
streamer.sources.append(FakeQueueSource(len(streamer.sources)))
return streamers return streamers

View File

@ -0,0 +1,251 @@
from django.utils import timezone as tz
import pytest
from model_bakery import baker
from aircox import models
from aircox.test import interface
from aircox_streamer import controllers
@pytest.fixture
def monitor(streamer):
streamer.calls = {}
return controllers.Monitor(
streamer,
tz.timedelta(seconds=10),
cancel_timeout=tz.timedelta(minutes=10),
sync_timeout=tz.timedelta(minutes=5),
)
@pytest.fixture
def diffusion(program, episode):
return baker.make(
models.Diffusion,
program=program,
episode=episode,
start=tz.now() - tz.timedelta(minutes=10),
end=tz.now() + tz.timedelta(minutes=30),
schedule=None,
type=models.Diffusion.TYPE_ON_AIR,
)
@pytest.fixture
def source(monitor, streamer, sound, diffusion):
source = next(monitor.streamer.playlists)
source.uri = sound.file.path
source.episode_id = sound.episode_id
source.air_time = diffusion.start + tz.timedelta(seconds=10)
return source
@pytest.fixture
def tracks(sound):
items = [
baker.prepare(models.Track, sound=sound, position=i, timestamp=i * 60)
for i in range(0, 4)
]
models.Track.objects.bulk_create(items)
return items
@pytest.fixture
def log(station, source, sound):
return baker.make(
models.Log,
station=station,
type=models.Log.TYPE_START,
sound=sound,
source=source.id,
)
class TestMonitor:
@pytest.mark.django_db(transaction=True)
def test_last_diff_start(self, monitor):
pass
@pytest.mark.django_db(transaction=True)
def test___init__(self, monitor):
assert isinstance(monitor.logs, models.LogQuerySet)
assert isinstance(monitor.last_sound_logs, dict)
@pytest.mark.django_db(transaction=True)
def test_get_logs_queryset(self, monitor, station, sounds):
query = monitor.get_logs_queryset()
assert all(log.station_id == station.pk for log in query)
@pytest.mark.django_db(transaction=True)
def test_init_last_sound_logs(self, monitor, source, log):
monitor.init_last_sound_logs()
assert monitor.last_sound_logs[source.id] == log
@pytest.mark.django_db(transaction=True)
def test_monitor(self, monitor, source, log, sound):
monitor.streamer.is_ready = True
monitor.streamer.source = source
interface(
monitor,
{
"trace_sound": log,
"trace_tracks": None,
"handle_diffusions": None,
"sync": None,
},
)
monitor.monitor()
assert monitor.streamer.calls.get("fetch")
assert monitor.calls["trace_sound"] == ((source,), {})
assert monitor.calls["trace_tracks"] == ((log,), {})
assert monitor.calls["handle_diffusions"]
assert monitor.calls["sync"]
@pytest.mark.django_db(transaction=True)
def test_monitor_streamer_not_ready(self, monitor):
monitor.streamer.is_ready = False
interface(
monitor,
{
"trace_sound": log,
"trace_tracks": None,
"handle_diffusions": None,
"sync": None,
},
)
monitor.monitor()
assert not monitor.streamer.calls.get("fetch")
assert monitor.calls["trace_sound"] is None
assert monitor.calls["trace_tracks"] is None
assert not monitor.calls["handle_diffusions"]
assert not monitor.calls["sync"]
@pytest.mark.django_db(transaction=True)
def test_monitor_no_source_uri(self, monitor, log):
source.uri = None
monitor.streamer.is_ready = True
monitor.streamer.source = source
interface(
monitor,
{
"trace_sound": log,
"trace_tracks": None,
"handle_diffusions": None,
"sync": None,
},
)
monitor.monitor()
assert monitor.streamer.calls.get("fetch")
assert monitor.calls["trace_sound"] is None
assert monitor.calls["trace_tracks"] is None
assert monitor.calls["handle_diffusions"]
assert monitor.calls["sync"]
@pytest.mark.django_db(transaction=True)
def test_trace_sound(self, monitor, diffusion, source, sound):
monitor.last_sound_logs[source.id] = None
result = monitor.trace_sound(source)
assert result.type == models.Log.TYPE_ON_AIR
assert result.source == source.id
assert result.sound == sound
assert result.diffusion == diffusion
@pytest.mark.django_db(transaction=True)
def test_trace_sound_returns_last_log(self, monitor, source, sound, log):
log.sound = sound
monitor.last_sound_logs[source.id] = log
result = monitor.trace_sound(source)
assert result == log
@pytest.mark.django_db(transaction=True)
def test_trace_tracks(self, monitor, log, tracks):
interface(monitor, {"log": None})
for track in tracks:
log.date = tz.now() - tz.timedelta(seconds=track.timestamp + 5)
monitor.trace_tracks(log)
assert monitor.calls["log"]
log_by_track = [call[1].get("track") for call in monitor.calls["log"]]
# only one call of log
assert all(log_by_track.count(track) for track in tracks)
@pytest.mark.django_db(transaction=True)
def test_trace_tracks_returns_on_log_diffusion(
self, monitor, log, diffusion, tracks
):
log.diffusion = None
monitor.trace_tracks(log)
@pytest.mark.django_db(transaction=True)
def test_trace_tracks_returns_on_no_tracks_exists(self, monitor, log):
log.diffusion = None
monitor.trace_tracks(log)
@pytest.mark.django_db(transaction=True)
def test_handle_diffusions(self, monitor):
pass
@pytest.mark.django_db(transaction=True)
def test_log(self, monitor, source):
log = monitor.log("source", type=models.Log.TYPE_START, comment="test")
assert log.source == "source"
assert log.type == models.Log.TYPE_START
assert log.comment == "test"
@pytest.mark.django_db(transaction=True)
def test_start_diff(
self, monitor, diffusion, source, episode, sound, tracks
):
result = {}
monitor.log = lambda **kw: result.update(kw)
monitor.start_diff(source, diffusion)
assert source.calls["push"] == (sound.file.path,)
assert result == {
"type": models.Log.TYPE_START,
"source": source.id,
"diffusion": diffusion,
"comment": str(diffusion),
}
@pytest.mark.django_db(transaction=True)
def test_cancel_diff(self, monitor, source, diffusion):
result = {}
monitor.log = lambda **kw: result.update(kw)
monitor.cancel_diff(source, diffusion)
assert diffusion.type == models.Log.TYPE_CANCEL
assert result == {
"type": models.Log.TYPE_CANCEL,
"source": source.id,
"diffusion": diffusion,
"comment": str(diffusion),
}
@pytest.mark.django_db(transaction=True)
def test_sync(self, monitor):
now = tz.now()
monitor.sync_next = now - tz.timedelta(minutes=1)
monitor.sync()
assert monitor.sync_next >= now + monitor.sync_timeout
assert all(
source.calls.get("sync") for source in monitor.streamer.playlists
)
@pytest.mark.django_db(transaction=True)
def test_sync_timeout_not_reached_skip_sync(self, monitor):
monitor.sync_next = tz.now() + tz.timedelta(
seconds=monitor.sync_timeout.total_seconds() + 20
)
monitor.sync()
assert all(
not source.calls.get("sync")
for source in monitor.streamer.playlists
)