diff --git a/aircox/test.py b/aircox/test.py new file mode 100644 index 0000000..0ce71ae --- /dev/null +++ b/aircox/test.py @@ -0,0 +1,33 @@ +"""This module provide test utilities.""" + + +__all__ = ("interface",) + + +def interface(obj, funcs): + """Override provided object's functions using dict of funcs, as + ``{func_name: return_value}``. + + Attribute ``obj.calls`` is a dict with all call done using those + methods, as ``{func_name: (args, kwargs) | list[(args, kwargs]]}``. + """ + if not isinstance(getattr(obj, "calls", None), dict): + obj.calls = {} + for attr, value in funcs.items(): + interface_wrap(obj, attr, value) + + +def interface_wrap(obj, attr, value): + obj.calls[attr] = None + + def wrapper(*a, **kw): + call = obj.calls.get(attr) + if call is None: + obj.calls[attr] = (a, kw) + elif isinstance(call, tuple): + obj.calls[attr] = [call, (a, kw)] + else: + call.append((a, kw)) + return value + + setattr(obj, attr, wrapper) diff --git a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo b/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo deleted file mode 100644 index 2c90dd0..0000000 Binary files a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo and /dev/null differ diff --git a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po b/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po deleted file mode 100644 index a7628cc..0000000 --- a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po +++ /dev/null @@ -1,130 +0,0 @@ -# SOME DESCRIPTIVE TITLE. -# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER -# This file is distributed under the same license as the PACKAGE package. -# FIRST AUTHOR , YEAR. -# -#, fuzzy -msgid "" -msgstr "" -"Project-Id-Version: PACKAGE VERSION\n" -"Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2020-01-06 14:14+0100\n" -"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" -"Last-Translator: FULL NAME \n" -"Language-Team: LANGUAGE \n" -"Language: \n" -"MIME-Version: 1.0\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Content-Transfer-Encoding: 8bit\n" -"Plural-Forms: nplurals=2; plural=(n > 1);\n" - -#: models.py:37 -msgid "input" -msgstr "" - -#: models.py:38 -msgid "output" -msgstr "" - -#: models.py:56 -msgid "station" -msgstr "" - -#: models.py:58 -msgid "direction" -msgstr "" - -#: models.py:59 -msgid "type" -msgstr "" - -#: models.py:61 -msgid "active" -msgstr "" - -#: models.py:62 -msgid "this port is active" -msgstr "" - -#: models.py:65 -msgid "port settings" -msgstr "" - -#: models.py:66 -msgid "" -"list of comma separated params available; this is put in the output config " -"file as raw code; plugin related" -msgstr "" - -#: templates/aircox_streamer/source_item.html:19 -msgid "Synchronize source with Liquidsoap" -msgstr "" - -#: templates/aircox_streamer/source_item.html:23 -msgid "Synchronise" -msgstr "" - -#: templates/aircox_streamer/source_item.html:26 -msgid "Restart current track" -msgstr "" - -#: templates/aircox_streamer/source_item.html:30 -msgid "Restart" -msgstr "" - -#: templates/aircox_streamer/source_item.html:33 -msgid "Skip current file" -msgstr "" - -#: templates/aircox_streamer/source_item.html:34 -msgid "Skip" -msgstr "" - -#: templates/aircox_streamer/source_item.html:43 -msgid "Add sound" -msgstr "" - -#: templates/aircox_streamer/source_item.html:51 -msgid "Select a sound" -msgstr "" - -#: templates/aircox_streamer/source_item.html:53 -msgid "Add a sound to the queue (queue may start playing)" -msgstr "" - -#: templates/aircox_streamer/source_item.html:62 -msgid "Add" -msgstr "" - -#: templates/aircox_streamer/source_item.html:68 -msgid "Sounds in queue" -msgstr "" - -#: templates/aircox_streamer/source_item.html:86 -msgid "Status" -msgstr "" - -#: templates/aircox_streamer/source_item.html:96 -msgid "Air time" -msgstr "" - -#: templates/aircox_streamer/source_item.html:106 -msgid "Time left" -msgstr "" - -#: templates/aircox_streamer/source_item.html:114 -msgid "Data source" -msgstr "" - -#: templates/aircox_streamer/streamer.html:19 -msgid "Reload" -msgstr "" - -#: templates/aircox_streamer/streamer.html:26 -#: templates/aircox_streamer/streamer.html:27 -msgid "Select a station" -msgstr "" - -#: urls.py:9 views.py:9 -msgid "Streamer Monitor" -msgstr "" diff --git a/aircox_streamer/connector.py b/aircox_streamer/connector.py index 5e4b5ef..7f1e2d0 100755 --- a/aircox_streamer/connector.py +++ b/aircox_streamer/connector.py @@ -12,6 +12,8 @@ class Connector: Received data can be parsed from list of `key=value` or JSON. """ + socket_class = socket.socket + """Socket class to instanciate on open.""" socket = None """The socket.""" address = None @@ -26,27 +28,45 @@ class Connector: if address: self.address = address + def __enter__(self): + r = self.open() + if r == -1: + raise RuntimeError("can not open socket.") + return self + + def __exit__(self): + self.close() + def open(self): + """Open connection. + + :return: 0 (success), 1 (already opened), -1 (failure) + """ if self.is_open: - return + return 1 family = ( socket.AF_UNIX if isinstance(self.address, str) else socket.AF_INET ) try: - self.socket = socket.socket(family, socket.SOCK_STREAM) + self.socket = self.socket_class(family, socket.SOCK_STREAM) self.socket.connect(self.address) + return 0 except Exception: + import traceback + + traceback.print_exc() self.close() return -1 def close(self): - self.socket.close() - self.socket = None + if self.is_open: + self.socket.close() + self.socket = None # FIXME: return None on failed def send(self, *data, try_count=1, parse=False, parse_json=False): - if self.open(): + if self.open() == -1: return None data = bytes("".join([str(d) for d in data]) + "\n", encoding="utf-8") diff --git a/aircox_streamer/controllers.py b/aircox_streamer/controllers.py deleted file mode 100755 index ca1d233..0000000 --- a/aircox_streamer/controllers.py +++ /dev/null @@ -1,404 +0,0 @@ -import atexit -import logging -import os -import re -import signal -import subprocess - -import psutil -import tzlocal -from django.template.loader import render_to_string -from django.utils import timezone as tz -from django.utils.translation import gettext_lazy as _ - -from aircox.conf import settings -from aircox.utils import to_seconds - -from .connector import Connector - -__all__ = [ - "BaseMetadata", - "Request", - "Streamer", - "Source", - "PlaylistSource", - "QueueSource", -] - -# TODO: for the moment, update in station and program names do not update the -# related fields. - -# FIXME liquidsoap does not manage timezones -- we have to convert -# 'on_air' metadata we get from it into utc one in order to work -# correctly. - -local_tz = tzlocal.get_localzone() -logger = logging.getLogger("aircox") - - -class BaseMetadata: - """Base class for handling request metadata.""" - - controller = None - """Controller.""" - rid = None - """Request id.""" - uri = None - """Request uri.""" - status = None - """Current playing status.""" - request_status = None - """Requests' status.""" - air_time = None - """Launch datetime.""" - - def __init__(self, controller=None, rid=None, data=None): - self.controller = controller - self.rid = rid - if data is not None: - self.validate(data) - - @property - def is_playing(self): - return self.status == "playing" - - @property - def status_verbose(self): - return self.validate_status(self.status, True) - - def fetch(self): - data = self.controller.send("request.metadata ", self.rid, parse=True) - if data: - self.validate(data) - - def validate_status(self, status, i18n=False): - on_air = self.controller.source - if ( - on_air - and status == "playing" - and (on_air == self or on_air.rid == self.rid) - ): - return _("playing") if i18n else "playing" - elif status == "playing": - return _("paused") if i18n else "paused" - else: - return _("stopped") if i18n else "stopped" - - def validate_air_time(self, air_time): - if air_time: - air_time = tz.datetime.strptime(air_time, "%Y/%m/%d %H:%M:%S") - return local_tz.localize(air_time) - - def validate(self, data): - """Validate provided data and set as attribute (must already be - declared)""" - for key, value in data.items(): - if hasattr(self, key) and not callable(getattr(self, key)): - setattr(self, key, value) - self.uri = data.get("initial_uri") - - self.air_time = self.validate_air_time(data.get("on_air")) - self.status = self.validate_status(data.get("status")) - self.request_status = data.get("status") - - -class Request(BaseMetadata): - title = None - artist = None - - -class Streamer: - connector = None - process = None - - station = None - template_name = "aircox_streamer/scripts/station.liq" - path = None - """Config path.""" - sources = None - """List of all monitored sources.""" - source = None - """Current source being played on air.""" - # note: we disable on_air rids since we don't have use of it for the - # moment - # on_air = None - # """ On-air request ids (rid) """ - inputs = None - """Queryset to input ports.""" - outputs = None - """Queryset to output ports.""" - - def __init__(self, station, connector=None): - self.station = station - self.inputs = self.station.port_set.active().input() - self.outputs = self.station.port_set.active().output() - - self.id = self.station.slug.replace("-", "_") - self.path = os.path.join(station.path, "station.liq") - self.connector = Connector(os.path.join(station.path, "station.sock")) - self.init_sources() - - @property - def socket_path(self): - """Path to Unix socket file.""" - return self.connector.address - - @property - def is_ready(self): - """If external program is ready to use, returns True.""" - return self.send("list") != "" - - @property - def is_running(self): - """True if holds a running process.""" - if self.process is None: - return False - - returncode = self.process.poll() - if returncode is None: - return True - - self.process = None - logger.debug("process died with return code %s" % returncode) - return False - - @property - def playlists(self): - return (s for s in self.sources if isinstance(s, PlaylistSource)) - - @property - def queues(self): - return (s for s in self.sources if isinstance(s, QueueSource)) - - # Sources and config ############################################### - def send(self, *args, **kwargs): - return self.connector.send(*args, **kwargs) or "" - - def init_sources(self): - streams = self.station.program_set.filter(stream__isnull=False) - self.dealer = QueueSource(self, "dealer") - self.sources = [self.dealer] + [ - PlaylistSource(self, program=program) for program in streams - ] - - def make_config(self): - """Make configuration files and directory (and sync sources)""" - data = render_to_string( - self.template_name, - { - "station": self.station, - "streamer": self, - "settings": settings, - }, - ) - data = re.sub("[\t ]+\n", "\n", data) - data = re.sub("\n{3,}", "\n\n", data) - - os.makedirs(os.path.dirname(self.path), exist_ok=True) - with open(self.path, "w+") as file: - file.write(data) - - self.sync() - - def sync(self): - """Sync all sources.""" - for source in self.sources: - source.sync() - - def fetch(self): - """Fetch data from liquidsoap.""" - for source in self.sources: - source.fetch() - - # request.on_air is not ordered: we need to do it manually - self.source = next( - iter( - sorted( - ( - source - for source in self.sources - if source.request_status == "playing" - and source.air_time - ), - key=lambda o: o.air_time, - reverse=True, - ) - ), - None, - ) - - # Process ########################################################## - def get_process_args(self): - return ["liquidsoap", "-v", self.path] - - def check_zombie_process(self): - if not os.path.exists(self.socket_path): - return - - conns = [ - conn - for conn in psutil.net_connections(kind="unix") - if conn.laddr == self.socket_path - ] - for conn in conns: - if conn.pid is not None: - os.kill(conn.pid, signal.SIGKILL) - - def run_process(self): - """Execute the external application with corresponding informations. - - This function must make sure that all needed files have been - generated. - """ - if self.process: - return - - args = self.get_process_args() - if not args: - return - - self.check_zombie_process() - self.process = subprocess.Popen(args, stderr=subprocess.STDOUT) - atexit.register(lambda: self.kill_process()) - - def kill_process(self): - if self.process: - logger.debug( - "kill process %s: %s", - self.process.pid, - " ".join(self.get_process_args()), - ) - self.process.kill() - self.process = None - - def wait_process(self): - """Wait for the process to terminate if there is a process.""" - if self.process: - self.process.wait() - self.process = None - - -class Source(BaseMetadata): - controller = None - """Parent controller.""" - id = None - """Source id.""" - remaining = 0.0 - """Remaining time.""" - status = "stopped" - - @property - def station(self): - return self.controller.station - - def __init__(self, controller=None, id=None, *args, **kwargs): - super().__init__(controller, *args, **kwargs) - self.id = id - - def sync(self): - """Synchronize what should be synchronized.""" - - def fetch(self): - try: - data = self.controller.send(self.id, ".remaining") - if data: - self.remaining = float(data) - except ValueError: - self.remaining = None - - data = self.controller.send(self.id, ".get", parse=True) - if data: - self.validate(data if data and isinstance(data, dict) else {}) - - def skip(self): - """Skip the current source sound.""" - self.controller.send(self.id, ".skip") - - def restart(self): - """Restart current sound.""" - # seek 10 hours back since there is not possibility to get current pos - self.seek(-216000 * 10) - - def seek(self, n): - """Seeks into the sound.""" - self.controller.send(self.id, ".seek ", str(n)) - - -class PlaylistSource(Source): - """Source handling playlists (program streams)""" - - path = None - """Path to playlist.""" - program = None - """Related program.""" - playlist = None - """The playlist.""" - - def __init__(self, controller, id=None, program=None, **kwargs): - id = program.slug.replace("-", "_") if id is None else id - self.program = program - - super().__init__(controller, id=id, **kwargs) - self.path = os.path.join(self.station.path, self.id + ".m3u") - - def get_sound_queryset(self): - """Get playlist's sounds queryset.""" - return self.program.sound_set.archive() - - def get_playlist(self): - """Get playlist from db.""" - return self.get_sound_queryset().playlist() - - def write_playlist(self, playlist=[]): - """Write playlist to file.""" - os.makedirs(os.path.dirname(self.path), exist_ok=True) - with open(self.path, "w") as file: - file.write("\n".join(playlist or [])) - - def stream(self): - """Return program's stream info if any (or None) as dict.""" - # used in templates - # TODO: multiple streams - stream = self.program.stream_set.all().first() - if not stream or (not stream.begin and not stream.delay): - return - - return { - "begin": stream.begin.strftime("%Hh%M") if stream.begin else None, - "end": stream.end.strftime("%Hh%M") if stream.end else None, - "delay": to_seconds(stream.delay) if stream.delay else 0, - } - - def sync(self): - playlist = self.get_playlist() - self.write_playlist(playlist) - - -class QueueSource(Source): - queue = None - """Source's queue (excluded on_air request)""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def push(self, *paths): - """Add the provided paths to source's play queue.""" - for path in paths: - self.controller.send(self.id, "_queue.push ", path) - - def fetch(self): - super().fetch() - queue = self.controller.send(self.id, "_queue.queue").strip() - if not queue: - self.queue = [] - return - - self.queue = queue.split(" ") - - @property - def requests(self): - """Queue as requests metadata.""" - requests = [Request(self.controller, rid) for rid in self.queue] - for request in requests: - request.fetch() - return requests diff --git a/aircox_streamer/controllers/__init__.py b/aircox_streamer/controllers/__init__.py new file mode 100644 index 0000000..11598d5 --- /dev/null +++ b/aircox_streamer/controllers/__init__.py @@ -0,0 +1,25 @@ +# TODO: for the moment, update in station and program names do not update the +# related fields. + +from .metadata import Metadata, Request +from .streamer import Streamer +from .streamers import Streamers +from .sources import Source, PlaylistSource, QueueSource +from .monitor import Monitor + + +streamers = Streamers() +"""Default controller used by views and viewsets.""" + + +__all__ = ( + "Metadata", + "Request", + "Streamer", + "Streamers", + "Source", + "PlaylistSource", + "QueueSource", + "Monitor", + "streamers", +) diff --git a/aircox_streamer/controllers/metadata.py b/aircox_streamer/controllers/metadata.py new file mode 100755 index 0000000..6c3ddab --- /dev/null +++ b/aircox_streamer/controllers/metadata.py @@ -0,0 +1,95 @@ +import logging + +import tzlocal +from django.utils import timezone as tz +from django.utils.translation import gettext_lazy as _ + + +__all__ = ( + "Metadata", + "Request", +) + +local_tz = tzlocal.get_localzone() +logger = logging.getLogger("aircox") + + +# FIXME liquidsoap does not manage timezones -- we have to convert +# 'on_air' metadata we get from it into utc one in order to work +# correctly. + + +class Metadata: + """Base class for handling request metadata.""" + + controller = None + """Controller.""" + rid = None + """Request id.""" + uri = None + """Request uri.""" + status = None + """Current playing status.""" + request_status = None + """Requests' status.""" + air_time = None + """Launch datetime.""" + + def __init__(self, controller=None, rid=None, data=None): + self.controller = controller + self.rid = rid + if data is not None: + self.validate(data) + + @property + def is_playing(self): + """True if the source is playing.""" + # FIXME: validate on controller's current source? + return self.status == "playing" + + @property + def status_verbose(self): + """Verbose version of self's status (translated string).""" + status = self.validate_status(self.status) + return _(status) if status else "" + + def fetch(self): + data = self.controller.send("request.metadata ", self.rid, parse=True) + if data: + self.validate(data) + + def validate_status(self, status): + """Return correct status for this metadata based on provided one and + controller. + + :returns: status string + """ + on_air = self.controller.source + if "playing" and on_air and (on_air == self or on_air.rid == self.rid): + return "playing" + elif status in ("paused", "playing"): + return "paused" + else: + return "stopped" + + def validate_air_time(self, air_time): + if air_time: + air_time = tz.datetime.strptime(air_time, "%Y/%m/%d %H:%M:%S") + return local_tz.localize(air_time) + + def validate(self, data): + """Validate provided data and set as attribute (must already be + declared)""" + for key, value in data.items(): + if hasattr(self, key) and not callable(getattr(self, key)): + setattr(self, key, value) + self.uri = data.get("initial_uri") + + self.air_time = self.validate_air_time(data.get("on_air")) + self.status = self.validate_status(data.get("status")) + self.request_status = data.get("status") + + +class Request(Metadata): + title = None + artist = None diff --git a/aircox_streamer/controllers/monitor.py b/aircox_streamer/controllers/monitor.py new file mode 100644 index 0000000..0e8f46a --- /dev/null +++ b/aircox_streamer/controllers/monitor.py @@ -0,0 +1,272 @@ +# TODO: +# x controllers: remaining +# x diffusion conflicts +# x cancel +# x when liquidsoap fails to start/exists: exit +# - handle restart after failure +# - is stream restart after live ok? +from django.utils import timezone as tz + +from aircox.models import Diffusion, Log, Sound, Track + + +class Monitor: + """Log and launch diffusions for the given station. + + Monitor should be able to be used after a crash a go back + where it was playing, so we heavily use logs to be able to + do that. + + We keep trace of played items on the generated stream: + - sounds played on this stream; + - scheduled diffusions + - tracks for sounds of streamed programs + """ + + streamer = None + """Streamer controller.""" + delay = None + """ Timedelta: minimal delay between two call of monitor. """ + logs = None + """Queryset to station's logs (ordered by -pk)""" + cancel_timeout = tz.timedelta(minutes=20) + """Timeout in minutes before cancelling a diffusion.""" + sync_timeout = tz.timedelta(minutes=5) + """Timeout in minutes between two streamer's sync.""" + sync_next = None + """Datetime of the next sync.""" + last_sound_logs = None + """Last logged sounds, as ``{source_id: log}``.""" + + @property + def station(self): + return self.streamer.station + + @property + def last_log(self): + """Last log of monitored station.""" + return self.logs.first() + + @property + def last_diff_start(self): + """Log of last triggered item (sound or diffusion).""" + return self.logs.start().with_diff().first() + + def __init__(self, streamer, delay, **kwargs): + self.streamer = streamer + # adding time ensures all calculations have a margin + self.delay = delay + tz.timedelta(seconds=5) + self.__dict__.update(kwargs) + self.logs = self.get_logs_queryset() + self.init_last_sound_logs() + + def get_logs_queryset(self): + """Return queryset to assign as `self.logs`""" + return self.station.log_set.select_related( + "diffusion", "sound", "track" + ).order_by("-pk") + + def init_last_sound_logs(self): + """Retrieve last logs and initialize `last_sound_logs`""" + logs = {} + for source in self.streamer.sources: + qs = self.logs.filter(source=source.id, sound__isnull=False) + logs[source.id] = qs.first() + self.last_sound_logs = logs + + def monitor(self): + """Run all monitoring functions once.""" + if not self.streamer.is_ready: + return + + self.streamer.fetch() + + # Skip tracing - analyzis: + # Reason: multiple database request every x seconds, reducing it. + # We could skip this part when remaining time is higher than a minimal + # value (which should be derived from Command's delay). Problems: + # - How to trace tracks? (+ Source can change: caching log might sucks) + # - if liquidsoap's source/request changes: remaining time goes higher, + # thus avoiding fetch + # + # Approach: something like having a mean time, such as: + # + # ``` + # source = stream.source + # mean_time = source.air_time + # + min(next_track.timestamp, source.remaining) + # - (command.delay + 1) + # trace_required = \/ source' != source + # \/ source.uri' != source.uri + # \/ now < mean_time + # ``` + # + source = self.streamer.source + if source and source.uri: + log = self.trace_sound(source) + if log: + self.trace_tracks(log) + else: + print("no source or sound for stream; source = ", source) + + self.handle_diffusions() + self.sync() + + def trace_sound(self, source): + """Return on air sound log (create if not present).""" + air_uri, air_time = source.uri, source.air_time + last_log = self.last_sound_logs.get(source.id) + if last_log and last_log.sound.file.path == source.uri: + return last_log + + # FIXME: can be a sound played when no Sound instance? If not, remove + # comment. + # check if there is yet a log for this sound on the source + # log = self.logs.on_air().filter( + # Q(sound__file=air_uri) | + # # sound can be null when arbitrary sound file is played + # Q(sound__isnull=True, track__isnull=True, comment=air_uri), + # source=source.id, + # date__range=date_range(air_time, self.delay), + # ).first() + # if log: + # return log + + # get sound + diff = None + sound = Sound.objects.path(air_uri).first() + if sound and sound.episode_id is not None: + diff = ( + Diffusion.objects.episode(id=sound.episode_id) + .on_air() + .now(air_time) + .first() + ) + + # log sound on air + return self.log( + type=Log.TYPE_ON_AIR, + date=source.air_time, + source=source.id, + sound=sound, + diffusion=diff, + comment=air_uri, + ) + + def trace_tracks(self, log): + """Log tracks for the given sound log (for streamed programs only).""" + if log.diffusion: + return + + tracks = Track.objects.filter( + sound_id=log.sound_id, timestamp__isnull=False + ).order_by("timestamp") + if not tracks.exists(): + return + + # exclude already logged tracks + tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk) + now = tz.now() + for track in tracks: + pos = log.date + tz.timedelta(seconds=track.timestamp) + if pos <= now: + self.log( + type=Log.TYPE_ON_AIR, + date=pos, + source=log.source, + track=track, + comment=track, + ) + + def handle_diffusions(self): + """Handle scheduled diffusion, trigger if needed, preload playlists and + so on.""" + # TODO: program restart + + # Diffusion conflicts are handled by the way a diffusion is defined + # as candidate for the next dealer's start. + # + # ``` + # logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START + # /\ log.diff = diff + # /\ log.date = diff.start + # queue_empty: /\ dealer.queue is empty + # /\ \/ ~dealer.on_air + # \/ dealer.remaining < delay + # + # start_allowed: /\ diff not in logged_diff + # /\ queue_empty + # + # start_canceled: /\ diff not in logged diff + # /\ ~queue_empty + # /\ diff.start < now + cancel_timeout + # ``` + # + now = tz.now() + diff = ( + Diffusion.objects.station(self.station) + .on_air() + .now(now) + .filter(episode__sound__type=Sound.TYPE_ARCHIVE) + .first() + ) + # Can't use delay: diffusion may start later than its assigned start. + log = None if not diff else self.logs.start().filter(diffusion=diff) + if not diff or log: + return + + dealer = self.streamer.dealer + # start + if ( + not dealer.queue + and dealer.rid is None + or dealer.remaining < self.delay.total_seconds() + ): + self.start_diff(dealer, diff) + + # cancel + if diff.start < now - self.cancel_timeout: + self.cancel_diff(dealer, diff) + + def log(self, source, **kwargs): + """Create a log using **kwargs, and print info.""" + kwargs.setdefault("station", self.station) + kwargs.setdefault("date", tz.now()) + log = Log(source=source, **kwargs) + log.save() + log.print() + + if log.sound: + self.last_sound_logs[source] = log + return log + + def start_diff(self, source, diff): + playlist = Sound.objects.episode(id=diff.episode_id).playlist() + source.push(*playlist) + self.log( + type=Log.TYPE_START, + source=source.id, + diffusion=diff, + comment=str(diff), + ) + + def cancel_diff(self, source, diff): + diff.type = Diffusion.TYPE_CANCEL + diff.save() + self.log( + type=Log.TYPE_CANCEL, + source=source.id, + diffusion=diff, + comment=str(diff), + ) + + def sync(self): + """Update sources' playlists.""" + now = tz.now() + if self.sync_next is not None and now < self.sync_next: + return + + self.sync_next = now + self.sync_timeout + + for source in self.streamer.playlists: + source.sync() diff --git a/aircox_streamer/controllers/sources.py b/aircox_streamer/controllers/sources.py new file mode 100755 index 0000000..dc398a6 --- /dev/null +++ b/aircox_streamer/controllers/sources.py @@ -0,0 +1,141 @@ +import os +import tzlocal + +from aircox.utils import to_seconds + +from .metadata import Metadata, Request + + +__all__ = ( + "Source", + "PlaylistSource", + "QueueSource", +) + +local_tz = tzlocal.get_localzone() + + +class Source(Metadata): + controller = None + """Parent controller.""" + id = None + """Source id.""" + remaining = 0.0 + """Remaining time.""" + status = "stopped" + + @property + def station(self): + return self.controller.station + + def __init__(self, controller=None, id=None, *args, **kwargs): + super().__init__(controller, *args, **kwargs) + self.id = id + + def sync(self): + """Synchronize what should be synchronized.""" + + def fetch(self): + try: + data = self.controller.send(self.id, ".remaining") + if data: + self.remaining = float(data) + except ValueError: + self.remaining = None + + data = self.controller.send(self.id, ".get", parse=True) + if data: + self.validate(data if data and isinstance(data, dict) else {}) + + def skip(self): + """Skip the current source sound.""" + self.controller.send(self.id, ".skip") + + def restart(self): + """Restart current sound.""" + # seek 10 hours back since there is not possibility to get current pos + self.seek(-216000 * 10) + + def seek(self, n): + """Seeks into the sound.""" + self.controller.send(self.id, ".seek ", str(n)) + + +class PlaylistSource(Source): + """Source handling playlists (program streams)""" + + path = None + """Path to playlist.""" + program = None + """Related program.""" + playlist = None + """The playlist.""" + + def __init__(self, controller, id=None, program=None, **kwargs): + id = program.slug.replace("-", "_") if id is None else id + self.program = program + + super().__init__(controller, id=id, **kwargs) + self.path = os.path.join(self.station.path, f"{self.id}.m3u") + + def get_sound_queryset(self): + """Get playlist's sounds queryset.""" + return self.program.sound_set.archive() + + def get_playlist(self): + """Get playlist from db.""" + return self.get_sound_queryset().playlist() + + def write_playlist(self, playlist=[]): + """Write playlist to file.""" + os.makedirs(os.path.dirname(self.path), exist_ok=True) + with open(self.path, "w") as file: + file.write("\n".join(playlist or [])) + + def stream(self): + """Return program's stream info if any (or None) as dict.""" + # used in templates + # TODO: multiple streams + stream = self.program.stream_set.all().first() + if not stream or (not stream.begin and not stream.delay): + return + + return { + "begin": stream.begin.strftime("%Hh%M") if stream.begin else None, + "end": stream.end.strftime("%Hh%M") if stream.end else None, + "delay": to_seconds(stream.delay) if stream.delay else 0, + } + + def sync(self): + playlist = self.get_playlist() + self.write_playlist(playlist) + + +class QueueSource(Source): + queue = None + """Source's queue (excluded on_air request)""" + + @property + def requests(self): + """Queue as requests metadata.""" + requests = [Request(self.controller, rid) for rid in self.queue] + for request in requests: + request.fetch() + return requests + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def push(self, *paths): + """Add the provided paths to source's play queue.""" + for path in paths: + self.controller.send(f"{self.id}_queue.push {path}") + + def fetch(self): + super().fetch() + queue = self.controller.send(f"{self.id}_queue.queue").strip() + if not queue: + self.queue = [] + return + + self.queue = queue.split(" ") diff --git a/aircox_streamer/controllers/streamer.py b/aircox_streamer/controllers/streamer.py new file mode 100755 index 0000000..20c4d42 --- /dev/null +++ b/aircox_streamer/controllers/streamer.py @@ -0,0 +1,193 @@ +import atexit +import logging +import os +import re +import signal +import subprocess + +import psutil +from django.template.loader import render_to_string + +from aircox.conf import settings + +from ..connector import Connector +from .sources import PlaylistSource, QueueSource + + +__all__ = ("Streamer",) + +logger = logging.getLogger("aircox") + + +class Streamer: + connector = None + process = None + + station = None + template_name = "aircox_streamer/scripts/station.liq" + path = None + """Config path.""" + sources = None + """List of all monitored sources.""" + source = None + """Current source being played on air.""" + # note: we disable on_air rids since we don't have use of it for the + # moment + # on_air = None + # """ On-air request ids (rid) """ + inputs = None + """Queryset to input ports.""" + outputs = None + """Queryset to output ports.""" + + def __init__(self, station, connector=None): + self.station = station + self.inputs = self.station.port_set.active().input() + self.outputs = self.station.port_set.active().output() + + self.id = self.station.slug.replace("-", "_") + self.path = os.path.join(station.path, "station.liq") + self.connector = connector or Connector( + os.path.join(station.path, "station.sock") + ) + self.init_sources() + + @property + def socket_path(self): + """Path to Unix socket file.""" + return self.connector.address + + @property + def is_ready(self): + """If external program is ready to use, returns True.""" + return self.send("list") != "" + + @property + def is_running(self): + """True if holds a running process.""" + if self.process is None: + return False + + returncode = self.process.poll() + if returncode is None: + return True + + self.process = None + logger.debug("process died with return code %s" % returncode) + return False + + @property + def playlists(self): + return (s for s in self.sources if isinstance(s, PlaylistSource)) + + @property + def queues(self): + return (s for s in self.sources if isinstance(s, QueueSource)) + + # Sources and config ############################################### + def send(self, *args, **kwargs): + return self.connector.send(*args, **kwargs) or "" + + def init_sources(self): + streams = self.station.program_set.filter(stream__isnull=False) + self.dealer = QueueSource(self, "dealer") + self.sources = [self.dealer] + [ + PlaylistSource(self, program=program) for program in streams + ] + + def make_config(self): + """Make configuration files and directory (and sync sources)""" + data = render_to_string( + self.template_name, + { + "station": self.station, + "streamer": self, + "settings": settings, + }, + ) + data = re.sub("[\t ]+\n", "\n", data) + data = re.sub("\n{3,}", "\n\n", data) + + os.makedirs(os.path.dirname(self.path), exist_ok=True) + with open(self.path, "w+") as file: + file.write(data) + + self.sync() + + def sync(self): + """Sync all sources.""" + for source in self.sources: + source.sync() + + def fetch(self): + """Fetch data from liquidsoap.""" + for source in self.sources: + source.fetch() + + # request.on_air is not ordered: we need to do it manually + self.source = next( + iter( + sorted( + ( + source + for source in self.sources + if source.request_status == "playing" + and source.air_time + ), + key=lambda o: o.air_time, + reverse=True, + ) + ), + None, + ) + + # Process ########################################################## + def get_process_args(self): + return ["liquidsoap", "-v", self.path] + + def check_zombie_process(self): + if not os.path.exists(self.socket_path): + return + + conns = [ + conn + for conn in psutil.net_connections(kind="unix") + if conn.laddr == self.socket_path + ] + for conn in conns: + if conn.pid is not None: + os.kill(conn.pid, signal.SIGKILL) + + def run_process(self): + """Execute the external application with corresponding informations. + + This function must make sure that all needed files have been + generated. + """ + if self.process: + return + + args = self.get_process_args() + if not args: + return + + self.check_zombie_process() + self.process = subprocess.Popen(args, stderr=subprocess.STDOUT) + atexit.register(self.kill_process) + + def kill_process(self): + if self.process: + logger.debug( + "kill process %s: %s", + self.process.pid, + " ".join(self.get_process_args()), + ) + self.process.kill() + self.process = None + atexit.unregister(self.kill_process) + + def wait_process(self): + """Wait for the process to terminate if there is a process.""" + if self.process: + self.process.wait() + self.process = None diff --git a/aircox_streamer/controllers/streamers.py b/aircox_streamer/controllers/streamers.py new file mode 100644 index 0000000..3775f9a --- /dev/null +++ b/aircox_streamer/controllers/streamers.py @@ -0,0 +1,62 @@ +from django.utils import timezone as tz + +from aircox.models import Station +from .streamer import Streamer + +__all__ = ("Streamers",) + + +class Streamers: + """Keep multiple streamers in memory, allow fetching informations.""" + + streamers = None + """Stations by station id.""" + streamer_class = Streamer + timeout = None + """Timedelta to next update.""" + next_date = None + """Next update datetime.""" + + def __init__(self, timeout=None, streamer_class=streamer_class): + self.timeout = timeout or tz.timedelta(seconds=2) + + def reset(self, stations=Station.objects.active()): + # FIXME: cf. TODO in aircox.controllers about model updates + stations = stations.all() + self.streamers = { + station.pk: self.streamer_class(station) for station in stations + } + + def fetch(self): + """Call streamers fetch if timed-out.""" + if self.streamers is None: + self.reset() + + now = tz.now() + if self.next_date is not None and now < self.next_date: + return + + for streamer in self.streamers.values(): + streamer.fetch() + self.next_date = now + self.timeout + + def get(self, key, default=None): + return self.streamers.get(key, default) + + def values(self): + return self.streamers.values() + + def __len__(self): + return self.streamers and len(self.streamers) or 0 + + def __getitem__(self, key): + return self.streamers[key] + + def __contains__(self, key): + """Key can be a Station or a Station id.""" + if isinstance(key, Station): + return key.pk in self.streamers + return key in self.streamers + + def __iter__(self): + return self.streamers.values() if self.streamers else iter(tuple()) diff --git a/aircox_streamer/management/commands/streamer.py b/aircox_streamer/management/commands/streamer.py index d94235c..9703f75 100755 --- a/aircox_streamer/management/commands/streamer.py +++ b/aircox_streamer/management/commands/streamer.py @@ -8,290 +8,20 @@ to: """ import time -# TODO: -# x controllers: remaining -# x diffusion conflicts -# x cancel -# x when liquidsoap fails to start/exists: exit -# - handle restart after failure -# - is stream restart after live ok? from argparse import RawTextHelpFormatter import pytz from django.core.management.base import BaseCommand from django.utils import timezone as tz -from aircox.models import Diffusion, Log, Sound, Station, Track -from aircox_streamer.controllers import Streamer +from aircox.models import Station +from aircox_streamer.controllers import Monitor, Streamer + # force using UTC tz.activate(pytz.UTC) -class Monitor: - """Log and launch diffusions for the given station. - - Monitor should be able to be used after a crash a go back - where it was playing, so we heavily use logs to be able to - do that. - - We keep trace of played items on the generated stream: - - sounds played on this stream; - - scheduled diffusions - - tracks for sounds of streamed programs - """ - - streamer = None - """Streamer controller.""" - delay = None - """ Timedelta: minimal delay between two call of monitor. """ - logs = None - """Queryset to station's logs (ordered by -pk)""" - cancel_timeout = 20 - """Timeout in minutes before cancelling a diffusion.""" - sync_timeout = 5 - """Timeout in minutes between two streamer's sync.""" - sync_next = None - """Datetime of the next sync.""" - last_sound_logs = None - """Last logged sounds, as ``{source_id: log}``.""" - - @property - def station(self): - return self.streamer.station - - @property - def last_log(self): - """Last log of monitored station.""" - return self.logs.first() - - @property - def last_diff_start(self): - """Log of last triggered item (sound or diffusion).""" - return self.logs.start().with_diff().first() - - def __init__(self, streamer, delay, cancel_timeout, **kwargs): - self.streamer = streamer - # adding time ensure all calculation have a margin - self.delay = delay + tz.timedelta(seconds=5) - self.cancel_timeout = cancel_timeout - self.__dict__.update(kwargs) - self.logs = self.get_logs_queryset() - self.init_last_sound_logs() - - def get_logs_queryset(self): - """Return queryset to assign as `self.logs`""" - return self.station.log_set.select_related( - "diffusion", "sound", "track" - ).order_by("-pk") - - def init_last_sound_logs(self, key=None): - """Retrieve last logs and initialize `last_sound_logs`""" - logs = {} - for source in self.streamer.sources: - qs = self.logs.filter(source=source.id, sound__isnull=False) - logs[source.id] = qs.first() - self.last_sound_logs = logs - - def monitor(self): - """Run all monitoring functions once.""" - if not self.streamer.is_ready: - return - - self.streamer.fetch() - - # Skip tracing - analyzis: - # Reason: multiple database request every x seconds, reducing it. - # We could skip this part when remaining time is higher than a minimal - # value (which should be derived from Command's delay). Problems: - # - How to trace tracks? (+ Source can change: caching log might sucks) - # - if liquidsoap's source/request changes: remaining time goes higher, - # thus avoiding fetch - # - # Approach: something like having a mean time, such as: - # - # ``` - # source = stream.source - # mean_time = source.air_time - # + min(next_track.timestamp, source.remaining) - # - (command.delay + 1) - # trace_required = \/ source' != source - # \/ source.uri' != source.uri - # \/ now < mean_time - # ``` - # - source = self.streamer.source - if source and source.uri: - log = self.trace_sound(source) - if log: - self.trace_tracks(log) - else: - print("no source or sound for stream; source = ", source) - - self.handle_diffusions() - self.sync() - - def log(self, source, **kwargs): - """Create a log using **kwargs, and print info.""" - kwargs.setdefault("station", self.station) - kwargs.setdefault("date", tz.now()) - log = Log(source=source, **kwargs) - log.save() - log.print() - - if log.sound: - self.last_sound_logs[source] = log - return log - - def trace_sound(self, source): - """Return on air sound log (create if not present).""" - air_uri, air_time = source.uri, source.air_time - last_log = self.last_sound_logs.get(source.id) - if last_log and last_log.sound.file.path == source.uri: - return last_log - - # FIXME: can be a sound played when no Sound instance? If not, remove - # comment. - # check if there is yet a log for this sound on the source - # log = self.logs.on_air().filter( - # Q(sound__file=air_uri) | - # # sound can be null when arbitrary sound file is played - # Q(sound__isnull=True, track__isnull=True, comment=air_uri), - # source=source.id, - # date__range=date_range(air_time, self.delay), - # ).first() - # if log: - # return log - - # get sound - diff = None - sound = Sound.objects.path(air_uri).first() - if sound and sound.episode_id is not None: - diff = ( - Diffusion.objects.episode(id=sound.episode_id) - .on_air() - .now(air_time) - .first() - ) - - # log sound on air - return self.log( - type=Log.TYPE_ON_AIR, - date=source.air_time, - source=source.id, - sound=sound, - diffusion=diff, - comment=air_uri, - ) - - def trace_tracks(self, log): - """Log tracks for the given sound log (for streamed programs only).""" - if log.diffusion: - return - - tracks = Track.objects.filter( - sound__id=log.sound_id, timestamp__isnull=False - ).order_by("timestamp") - if not tracks.exists(): - return - - # exclude already logged tracks - tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk) - now = tz.now() - for track in tracks: - pos = log.date + tz.timedelta(seconds=track.timestamp) - if pos > now: - break - self.log( - type=Log.TYPE_ON_AIR, - date=pos, - source=log.source, - track=track, - comment=track, - ) - - def handle_diffusions(self): - """Handle scheduled diffusion, trigger if needed, preload playlists and - so on.""" - # TODO: program restart - - # Diffusion conflicts are handled by the way a diffusion is defined - # as candidate for the next dealer's start. - # - # ``` - # logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START - # /\ log.diff = diff - # /\ log.date = diff.start - # queue_empty: /\ dealer.queue is empty - # /\ \/ ~dealer.on_air - # \/ dealer.remaining < delay - # - # start_allowed: /\ diff not in logged_diff - # /\ queue_empty - # - # start_canceled: /\ diff not in logged diff - # /\ ~queue_empty - # /\ diff.start < now + cancel_timeout - # ``` - # - now = tz.now() - diff = ( - Diffusion.objects.station(self.station) - .on_air() - .now(now) - .filter(episode__sound__type=Sound.TYPE_ARCHIVE) - .first() - ) - # Can't use delay: diffusion may start later than its assigned start. - log = None if not diff else self.logs.start().filter(diffusion=diff) - if not diff or log: - return - - dealer = self.streamer.dealer - # start - if ( - not dealer.queue - and dealer.rid is None - or dealer.remaining < self.delay.total_seconds() - ): - self.start_diff(dealer, diff) - - # cancel - if diff.start < now - self.cancel_timeout: - self.cancel_diff(dealer, diff) - - def start_diff(self, source, diff): - playlist = Sound.objects.episode(id=diff.episode_id).playlist() - source.push(*playlist) - self.log( - type=Log.TYPE_START, - source=source.id, - diffusion=diff, - comment=str(diff), - ) - - def cancel_diff(self, source, diff): - diff.type = Diffusion.TYPE_CANCEL - diff.save() - self.log( - type=Log.TYPE_CANCEL, - source=source.id, - diffusion=diff, - comment=str(diff), - ) - - def sync(self): - """Update sources' playlists.""" - now = tz.now() - if self.sync_next is not None and now < self.sync_next: - return - - self.sync_next = now + tz.timedelta(minutes=self.sync_timeout) - - for source in self.streamer.playlists: - source.sync() - - class Command(BaseCommand): help = __doc__ @@ -339,7 +69,7 @@ class Command(BaseCommand): "-t", "--timeout", type=float, - default=Monitor.cancel_timeout, + default=Monitor.cancel_timeout.total_seconds() / 60, help="time to wait in MINUTES before canceling a diffusion that " "should have ran but did not. ", ) @@ -377,7 +107,8 @@ class Command(BaseCommand): delay = tz.timedelta(milliseconds=delay) timeout = tz.timedelta(minutes=timeout) monitors = [ - Monitor(streamer, delay, timeout) for streamer in streamers + Monitor(streamer, delay, cancel_timeout=timeout) + for streamer in streamers ] while not run or streamer.is_running: diff --git a/aircox_streamer/serializers.py b/aircox_streamer/serializers.py index ce0ae79..ac3a2a3 100644 --- a/aircox_streamer/serializers.py +++ b/aircox_streamer/serializers.py @@ -22,18 +22,18 @@ class BaseSerializer(serializers.Serializer): return reverse(self.url_name, kwargs=kwargs) -class BaseMetadataSerializer(BaseSerializer): +class MetadataSerializer(BaseSerializer): rid = serializers.IntegerField() air_time = serializers.DateTimeField() uri = serializers.CharField() -class RequestSerializer(BaseMetadataSerializer): +class RequestSerializer(MetadataSerializer): title = serializers.CharField(required=False) artist = serializers.CharField(required=False) -class SourceSerializer(BaseMetadataSerializer): +class SourceSerializer(MetadataSerializer): id = serializers.CharField() uri = serializers.CharField() rid = serializers.IntegerField() diff --git a/aircox_streamer/tests/__init__.py b/aircox_streamer/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aircox_streamer/tests/conftest.py b/aircox_streamer/tests/conftest.py new file mode 100644 index 0000000..b38e766 --- /dev/null +++ b/aircox_streamer/tests/conftest.py @@ -0,0 +1,337 @@ +import itertools +import os + +from datetime import datetime, time +import tzlocal + +import pytest +from model_bakery import baker + +from aircox import models +from aircox_streamer import controllers +from aircox_streamer.connector import Connector + + +local_tz = tzlocal.get_localzone() + + +working_dir = os.path.join(os.path.dirname(__file__), "working_dir") + + +def interface_wrap(obj, attr, value): + if not isinstance(getattr(obj, "calls", None), dict): + obj.calls = {} + obj.calls[attr] = None + + def wrapper(*a, **kw): + call = obj.calls.get(attr) + if call is None: + obj.calls[attr] = (a, kw) + elif isinstance(call, tuple): + obj.calls[attr] = [call, (a, kw)] + else: + call.append((a, kw)) + return value + + setattr(obj, attr, wrapper) + + +def interface(obj, funcs): + """Override provided object's functions using dict of funcs, as ``{ + func_name: return_value}``. + + Attribute ``obj.calls`` is a dict + with all call done using those methods, as + ``{func_name: (args, kwargs)}``. + """ + for attr, value in funcs.items(): + interface_wrap(obj, attr, value) + + +class FakeSocket: + FAILING_ADDRESS = -1 + """Connect with this address fails.""" + + family, type, address = None, None, None + sent_data = None + """List of data that have been `send[all]`""" + recv_data = None + """Response data to return on recv.""" + + def __init__(self, family, type): + self.family = family + self.type = type + self.sent_data = [] + self.recv_data = "" + + def connect(self, address): + if address == self.FAILING_ADDRESS: + raise RuntimeError("invalid connection") + self.address = address + + def close(self): + pass + + def sendall(self, data): + self.sent_data.append(data.decode()) + + def recv(self, count): + if isinstance(self.recv_data, list): + if len(self.recv_data): + data, self.recv_data = self.recv_data[0], self.recv_data[1:] + else: + data = "" + else: + data = self.recv_data + self.recv_data = self.recv_data[count:] + data = data[:count] + return ( + data.encode("utf-8") if isinstance(data, str) else data + ) or b"\nEND" + + def is_sent(self, data): + """Return True if provided data have been sent.""" + # use [:-1] because connector add "\n" at sent data + return any(r for r in self.sent_data if r == data or r[:-1] == data) + + +# -- models +@pytest.fixture +def station(): + station = models.Station( + name="test", path=working_dir, default=True, active=True + ) + station.save() + return station + + +@pytest.fixture +def stations(station): + objs = [ + models.Station( + name=f"test-{i}", + slug=f"test-{i}", + path=working_dir, + default=(i == 0), + active=True, + ) + for i in range(0, 3) + ] + models.Station.objects.bulk_create(objs) + return [station] + objs + + +@pytest.fixture +def station_ports(station): + return _stations_ports(station) + + +@pytest.fixture +def stations_ports(stations): + return _stations_ports(*stations) + + +def _stations_ports(*stations): + items = list( + itertools.chain( + *[ + ( + models.Port( + station=station, + direction=models.Port.DIRECTION_INPUT, + type=models.Port.TYPE_HTTP, + active=True, + ), + models.Port( + station=station, + direction=models.Port.DIRECTION_OUTPUT, + type=models.Port.TYPE_FILE, + active=True, + ), + ) + for station in stations + ] + ) + ) + models.Port.objects.bulk_create(items) + return items + + +@pytest.fixture +def program(station): + program = models.Program(title="test", station=station) + program.save() + return program + + +@pytest.fixture +def stream(program): + stream = models.Stream( + program=program, begin=time(10, 12), end=time(12, 13) + ) + stream.save() + return stream + + +@pytest.fixture +def episode(program): + return baker.make(models.Episode, title="test episode", program=program) + + +@pytest.fixture +def sound(program, episode): + sound = models.Sound( + program=program, + episode=episode, + name="sound", + type=models.Sound.TYPE_ARCHIVE, + position=0, + file="sound.mp3", + ) + sound.save(check=False) + return sound + + +@pytest.fixture +def sounds(program): + items = [ + models.Sound( + name=f"sound {i}", + program=program, + type=models.Sound.TYPE_ARCHIVE, + position=i, + file=f"sound-{i}.mp3", + ) + for i in range(0, 3) + ] + models.Sound.objects.bulk_create(items) + return items + + +# -- connectors +@pytest.fixture +def connector(): + obj = Connector(os.path.join(working_dir, "test.sock")) + obj.socket_class = FakeSocket + yield obj + obj.close() + + +@pytest.fixture +def fail_connector(): + obj = Connector(FakeSocket.FAILING_ADDRESS) + obj.socket_class = FakeSocket + yield obj + obj.close() + + +@pytest.fixture +def controller(station, connector): + connector.open() + return controllers.Streamer(station, connector) + + +@pytest.fixture +def socket(controller): + return controller.connector.socket + + +# -- metadata +@pytest.fixture +def metadata(controller): + return controllers.Metadata(controller, 1) + + +@pytest.fixture +def metadata_data_air_time(): + return local_tz.localize(datetime(2023, 5, 1, 12, 10, 5)) + + +@pytest.fixture +def metadata_data(metadata_data_air_time): + return { + "rid": 1, + "initial_uri": "request_uri", + "on_air": metadata_data_air_time.strftime("%Y/%m/%d %H:%M:%S"), + "status": "playing", + } + + +@pytest.fixture +def metadata_string(metadata_data): + return ( + "\n".join(f"{key}={value}" for key, value in metadata_data.items()) + + "\nEND" + ) + + +# -- streamers +class FakeStreamer(controllers.Streamer): + calls = {} + is_ready = False + + def __init__(self, **kwargs): + self.__dict__.update(**kwargs) + + def fetch(self): + self.calls["fetch"] = True + + +class FakeSource(controllers.Source): + def __init__(self, id, *args, **kwargs): + self.id = id + self.args = args + self.kwargs = kwargs + self.calls = {} + + def fetch(self): + self.calls["sync"] = True + + def sync(self): + self.calls["sync"] = True + + def push(self, *path): + self.calls["push"] = path + return path + + def skip(self): + self.calls["skip"] = True + + def restart(self): + self.calls["restart"] = True + + def seek(self, c): + self.calls["seek"] = c + + +class FakePlaylist(FakeSource, controllers.PlaylistSource): + pass + + +class FakeQueueSource(FakeSource, controllers.QueueSource): + pass + + +@pytest.fixture +def streamer(station, station_ports): + streamer = FakeStreamer(station=station) + streamer.sources = [ + FakePlaylist(i, uri=f"source-{i}") for i in range(0, 3) + ] + streamer.sources.append(FakeQueueSource(len(streamer.sources))) + return streamer + + +@pytest.fixture +def streamers(stations, stations_ports): + streamers = controllers.Streamers(streamer_class=FakeStreamer) + # avoid unecessary db calls + streamers.streamers = { + station.pk: FakeStreamer(station=station) for station in stations + } + for j, streamer in enumerate(streamers.values()): + streamer.sources = [ + FakePlaylist(i, uri=f"source-{j}-{i}") for i in range(0, 3) + ] + streamer.sources.append(FakeQueueSource(len(streamer.sources))) + return streamers diff --git a/aircox_streamer/tests/fake_modules/__init__.py b/aircox_streamer/tests/fake_modules/__init__.py new file mode 100644 index 0000000..e1adc55 --- /dev/null +++ b/aircox_streamer/tests/fake_modules/__init__.py @@ -0,0 +1,39 @@ +import atexit as o_atexit +import subprocess as o_subprocess +import psutil as o_psutil + +from . import atexit, subprocess, psutil + +modules = [ + (o_atexit, atexit, {}), + (o_subprocess, subprocess, {}), + (o_psutil, psutil, {}), +] + + +def init_mappings(): + for original, spoof, mapping in modules: + if mapping: + continue + mapping.update( + { + attr: (getattr(original, attr, None), spoofed) + for attr, spoofed in vars(spoof).items() + if not attr.startswith("_") and hasattr(original, attr) + } + ) + + +def setup(): + for original, spoof, mappings in modules: + for attr, (orig, spoofed) in mappings.items(): + setattr(original, attr, spoofed) + + +def setdown(): + for original, spoof, mappings in modules: + for attr, (orig, spoofed) in mappings.items(): + setattr(original, attr, orig) + + +init_mappings() diff --git a/aircox_streamer/tests/fake_modules/atexit.py b/aircox_streamer/tests/fake_modules/atexit.py new file mode 100644 index 0000000..f38a644 --- /dev/null +++ b/aircox_streamer/tests/fake_modules/atexit.py @@ -0,0 +1,10 @@ +registered = [] +"""Items registered by register()""" + + +def register(func, *args, **kwargs): + registered.append(func) + + +def unregister(func): + registered.remove(func) diff --git a/aircox_streamer/tests/fake_modules/psutil.py b/aircox_streamer/tests/fake_modules/psutil.py new file mode 100644 index 0000000..927b520 --- /dev/null +++ b/aircox_streamer/tests/fake_modules/psutil.py @@ -0,0 +1,15 @@ +"""Spoof psutil module in order to run and check tests.""" + + +class FakeNetConnection: + def __init__(self, laddr, pid=None): + self.laddr = laddr + self.pid = pid + + +def net_connections(*args, **kwargs): + return net_connections.result + + +net_connections.result = [] +"""Result value of net_connections call.""" diff --git a/aircox_streamer/tests/fake_modules/subprocess.py b/aircox_streamer/tests/fake_modules/subprocess.py new file mode 100644 index 0000000..d263b8e --- /dev/null +++ b/aircox_streamer/tests/fake_modules/subprocess.py @@ -0,0 +1,39 @@ +"""Spoof psutil module in order to run and check tests Resulting values of +method calls are set inside `fixtures` module.""" + +STDOUT = 1 +STDERR = 2 +STDIN = 3 + + +class FakeProcess: + args = None + kwargs = None + """Kwargs passed to Popen.""" + killed = False + """kill() have been called.""" + waited = False + """wait() have been called.""" + polled = False + """poll() have been called.""" + poll_result = None + """Result of poll() method.""" + + def __init__(self, args=[], kwargs={}): + self.pid = -13 + self.args = args + self.kwargs = kwargs + + def kill(self): + self.killed = True + + def wait(self): + self.waited = True + + def poll(self): + self.polled = True + return self.poll_result + + +def Popen(args, **kwargs): + return FakeProcess(args, kwargs) diff --git a/aircox_streamer/tests/test_connector.py b/aircox_streamer/tests/test_connector.py new file mode 100644 index 0000000..4f677d5 --- /dev/null +++ b/aircox_streamer/tests/test_connector.py @@ -0,0 +1,70 @@ +import json +import os +import socket + +from .conftest import working_dir + + +class TestConnector: + payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END" + """Test payload.""" + payload_data = {"a": "value_1", "b": "value_b"} + """Resulting data of payload.""" + + def test_open(self, connector): + assert connector.open() == 0 + assert connector.is_open + assert connector.socket.family == socket.AF_UNIX + assert connector.socket.type == socket.SOCK_STREAM + assert connector.socket.address == os.path.join( + working_dir, "test.sock" + ) + connector.close() + + def test_open_af_inet(self, connector): + address = ("test", 30) + connector.address = address + assert connector.open() == 0 + assert connector.is_open + assert connector.socket.family == socket.AF_INET + assert connector.socket.type == socket.SOCK_STREAM + assert connector.socket.address == address + + def test_open_is_already_open(self, connector): + connector.open() + assert connector.open() == 1 + + def test_open_failure(self, fail_connector): + assert fail_connector.open() == -1 + assert fail_connector.socket is None # close() called + + def test_close(self, connector): + connector.open() + assert connector.socket is not None + connector.close() + assert connector.socket is None + + def test_send(self, connector): + connector.open() + connector.socket.recv_data = self.payload + result = connector.send("fake_action", parse=True) + assert result == self.payload_data + + def test_send_open_failure(self, fail_connector): + assert fail_connector.send("fake_action", parse=True) is None + + def test_parse(self, connector): + result = connector.parse(self.payload) + assert result == self.payload_data + + def test_parse_json(self, connector): + # include case where json string is surrounded by '"' + dumps = '"' + json.dumps(self.payload_data) + '"' + result = connector.parse_json(dumps) + assert result == self.payload_data + + def test_parse_json_empty_value(self, connector): + assert connector.parse_json('""') is None + + def test_parse_json_failure(self, connector): + assert connector.parse_json("-- invalid json string --") is None diff --git a/aircox_streamer/tests/test_controllers_metadata.py b/aircox_streamer/tests/test_controllers_metadata.py new file mode 100644 index 0000000..3e940a3 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_metadata.py @@ -0,0 +1,59 @@ +import pytest + +from aircox_streamer.controllers import Metadata + + +class TestBaseMetaData: + @pytest.mark.django_db + def test_is_playing(self, metadata): + metadata.status = "playing" + assert metadata.is_playing + + @pytest.mark.django_db + def test_is_playing_false(self, metadata): + metadata.status = "other" + assert not metadata.is_playing + + @pytest.mark.django_db + def test_fetch(self, controller, metadata, metadata_data, metadata_string): + controller.connector.socket.recv_data = metadata_string + metadata.fetch() + assert metadata.uri == metadata_data["initial_uri"] + + @pytest.mark.django_db + def test_validate_status_playing(self, controller, metadata): + controller.source = metadata + assert metadata.validate_status("playing") == "playing" + + @pytest.mark.django_db + def test_validate_status_paused(self, controller, metadata): + controller.source = Metadata(controller, metadata.rid + 1) + assert metadata.validate_status("playing") == "paused" + + @pytest.mark.django_db + def test_validate_status_stopped(self, controller, metadata): + controller.source = Metadata(controller, 2) + assert metadata.validate_status("") == "stopped" + assert metadata.validate_status("any") == "stopped" + + @pytest.mark.django_db + def test_validate_air_time( + self, metadata, metadata_data, metadata_data_air_time + ): + air_time = metadata_data["on_air"] + result = metadata.validate_air_time(air_time) + assert result == metadata_data_air_time + + @pytest.mark.django_db + def test_validate_air_time_none(self, metadata): + assert metadata.validate_air_time("") is None + + @pytest.mark.django_db + def test_validate(self, metadata, metadata_data, metadata_data_air_time): + metadata.validate(metadata_data) + assert metadata.uri == metadata_data["initial_uri"] + assert metadata.air_time == metadata_data_air_time + # controller.source != metadata + status = "playing" + # => status == "paused" + assert metadata.status == "paused" + assert metadata.request_status == "playing" diff --git a/aircox_streamer/tests/test_controllers_monitor.py b/aircox_streamer/tests/test_controllers_monitor.py new file mode 100644 index 0000000..6390265 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_monitor.py @@ -0,0 +1,251 @@ +from django.utils import timezone as tz + +import pytest +from model_bakery import baker + +from aircox import models +from aircox.test import interface +from aircox_streamer import controllers + + +@pytest.fixture +def monitor(streamer): + streamer.calls = {} + return controllers.Monitor( + streamer, + tz.timedelta(seconds=10), + cancel_timeout=tz.timedelta(minutes=10), + sync_timeout=tz.timedelta(minutes=5), + ) + + +@pytest.fixture +def diffusion(program, episode): + return baker.make( + models.Diffusion, + program=program, + episode=episode, + start=tz.now() - tz.timedelta(minutes=10), + end=tz.now() + tz.timedelta(minutes=30), + schedule=None, + type=models.Diffusion.TYPE_ON_AIR, + ) + + +@pytest.fixture +def source(monitor, streamer, sound, diffusion): + source = next(monitor.streamer.playlists) + source.uri = sound.file.path + source.episode_id = sound.episode_id + source.air_time = diffusion.start + tz.timedelta(seconds=10) + return source + + +@pytest.fixture +def tracks(sound): + items = [ + baker.prepare(models.Track, sound=sound, position=i, timestamp=i * 60) + for i in range(0, 4) + ] + models.Track.objects.bulk_create(items) + return items + + +@pytest.fixture +def log(station, source, sound): + return baker.make( + models.Log, + station=station, + type=models.Log.TYPE_START, + sound=sound, + source=source.id, + ) + + +class TestMonitor: + @pytest.mark.django_db(transaction=True) + def test_last_diff_start(self, monitor): + pass + + @pytest.mark.django_db(transaction=True) + def test___init__(self, monitor): + assert isinstance(monitor.logs, models.LogQuerySet) + assert isinstance(monitor.last_sound_logs, dict) + + @pytest.mark.django_db(transaction=True) + def test_get_logs_queryset(self, monitor, station, sounds): + query = monitor.get_logs_queryset() + assert all(log.station_id == station.pk for log in query) + + @pytest.mark.django_db(transaction=True) + def test_init_last_sound_logs(self, monitor, source, log): + monitor.init_last_sound_logs() + assert monitor.last_sound_logs[source.id] == log + + @pytest.mark.django_db(transaction=True) + def test_monitor(self, monitor, source, log, sound): + monitor.streamer.is_ready = True + monitor.streamer.source = source + interface( + monitor, + { + "trace_sound": log, + "trace_tracks": None, + "handle_diffusions": None, + "sync": None, + }, + ) + + monitor.monitor() + assert monitor.streamer.calls.get("fetch") + assert monitor.calls["trace_sound"] == ((source,), {}) + assert monitor.calls["trace_tracks"] == ((log,), {}) + assert monitor.calls["handle_diffusions"] + assert monitor.calls["sync"] + + @pytest.mark.django_db(transaction=True) + def test_monitor_streamer_not_ready(self, monitor): + monitor.streamer.is_ready = False + interface( + monitor, + { + "trace_sound": log, + "trace_tracks": None, + "handle_diffusions": None, + "sync": None, + }, + ) + + monitor.monitor() + assert not monitor.streamer.calls.get("fetch") + assert monitor.calls["trace_sound"] is None + assert monitor.calls["trace_tracks"] is None + assert not monitor.calls["handle_diffusions"] + assert not monitor.calls["sync"] + + @pytest.mark.django_db(transaction=True) + def test_monitor_no_source_uri(self, monitor, log): + source.uri = None + monitor.streamer.is_ready = True + monitor.streamer.source = source + interface( + monitor, + { + "trace_sound": log, + "trace_tracks": None, + "handle_diffusions": None, + "sync": None, + }, + ) + + monitor.monitor() + assert monitor.streamer.calls.get("fetch") + assert monitor.calls["trace_sound"] is None + assert monitor.calls["trace_tracks"] is None + assert monitor.calls["handle_diffusions"] + assert monitor.calls["sync"] + + @pytest.mark.django_db(transaction=True) + def test_trace_sound(self, monitor, diffusion, source, sound): + monitor.last_sound_logs[source.id] = None + + result = monitor.trace_sound(source) + assert result.type == models.Log.TYPE_ON_AIR + assert result.source == source.id + assert result.sound == sound + assert result.diffusion == diffusion + + @pytest.mark.django_db(transaction=True) + def test_trace_sound_returns_last_log(self, monitor, source, sound, log): + log.sound = sound + monitor.last_sound_logs[source.id] = log + + result = monitor.trace_sound(source) + assert result == log + + @pytest.mark.django_db(transaction=True) + def test_trace_tracks(self, monitor, log, tracks): + interface(monitor, {"log": None}) + for track in tracks: + log.date = tz.now() - tz.timedelta(seconds=track.timestamp + 5) + monitor.trace_tracks(log) + + assert monitor.calls["log"] + log_by_track = [call[1].get("track") for call in monitor.calls["log"]] + # only one call of log + assert all(log_by_track.count(track) for track in tracks) + + @pytest.mark.django_db(transaction=True) + def test_trace_tracks_returns_on_log_diffusion( + self, monitor, log, diffusion, tracks + ): + log.diffusion = None + monitor.trace_tracks(log) + + @pytest.mark.django_db(transaction=True) + def test_trace_tracks_returns_on_no_tracks_exists(self, monitor, log): + log.diffusion = None + monitor.trace_tracks(log) + + @pytest.mark.django_db(transaction=True) + def test_handle_diffusions(self, monitor): + pass + + @pytest.mark.django_db(transaction=True) + def test_log(self, monitor, source): + log = monitor.log("source", type=models.Log.TYPE_START, comment="test") + assert log.source == "source" + assert log.type == models.Log.TYPE_START + assert log.comment == "test" + + @pytest.mark.django_db(transaction=True) + def test_start_diff( + self, monitor, diffusion, source, episode, sound, tracks + ): + result = {} + monitor.log = lambda **kw: result.update(kw) + + monitor.start_diff(source, diffusion) + assert source.calls["push"] == (sound.file.path,) + assert result == { + "type": models.Log.TYPE_START, + "source": source.id, + "diffusion": diffusion, + "comment": str(diffusion), + } + + @pytest.mark.django_db(transaction=True) + def test_cancel_diff(self, monitor, source, diffusion): + result = {} + monitor.log = lambda **kw: result.update(kw) + + monitor.cancel_diff(source, diffusion) + assert diffusion.type == models.Log.TYPE_CANCEL + assert result == { + "type": models.Log.TYPE_CANCEL, + "source": source.id, + "diffusion": diffusion, + "comment": str(diffusion), + } + + @pytest.mark.django_db(transaction=True) + def test_sync(self, monitor): + now = tz.now() + monitor.sync_next = now - tz.timedelta(minutes=1) + monitor.sync() + + assert monitor.sync_next >= now + monitor.sync_timeout + assert all( + source.calls.get("sync") for source in monitor.streamer.playlists + ) + + @pytest.mark.django_db(transaction=True) + def test_sync_timeout_not_reached_skip_sync(self, monitor): + monitor.sync_next = tz.now() + tz.timedelta( + seconds=monitor.sync_timeout.total_seconds() + 20 + ) + monitor.sync() + assert all( + not source.calls.get("sync") + for source in monitor.streamer.playlists + ) diff --git a/aircox_streamer/tests/test_controllers_sources.py b/aircox_streamer/tests/test_controllers_sources.py new file mode 100644 index 0000000..7b5809f --- /dev/null +++ b/aircox_streamer/tests/test_controllers_sources.py @@ -0,0 +1,146 @@ +import os + +import pytest + +from aircox_streamer.controllers import ( + Source, + PlaylistSource, + QueueSource, + Request, +) + + +@pytest.fixture +def source(controller): + return Source(controller, 13) + + +@pytest.fixture +def playlist_source(controller, program): + return PlaylistSource(controller, 14, program) + + +@pytest.fixture +def queue_source(controller): + return QueueSource(controller, 15) + + +class TestSource: + @pytest.mark.django_db + def test_station(self, source, station): + assert source.station == station + + @pytest.mark.django_db + def test_fetch(self, socket, source, metadata_string): + remaining = 3.12 + socket.recv_data = [ + f"{remaining} END", + metadata_string, + ] + + source.fetch() + assert socket.is_sent(f"{source.id}.remaining") + assert socket.is_sent(f"{source.id}.get") + + assert source.remaining == remaining + assert source.request_status + + @pytest.mark.django_db + def test_skip(self, socket, source): + socket.recv_data = "\nEND" + source.skip() + assert socket.is_sent(f"{source.id}.skip\n") + + @pytest.mark.django_db + def test_restart(self, socket, source): + source.restart() + prefix = f"{source.id}.seek" + assert any(r for r in socket.sent_data if r.startswith(prefix)) + + @pytest.mark.django_db + def test_seek(self, socket, source): + source.seek(10) + assert socket.is_sent(f"{source.id}.seek 10") + + +class TestPlaylistSource: + @pytest.mark.django_db + def test_get_sound_queryset(self, playlist_source, sounds): + query = playlist_source.get_sound_queryset() + assert all( + r.program_id == playlist_source.program.pk + and r.type == r.TYPE_ARCHIVE + for r in query + ) + + @pytest.mark.django_db + def test_get_playlist(self, playlist_source, sounds): + expected = {r.file.path for r in sounds} + query = playlist_source.get_playlist() + assert all(r in expected for r in query) + + @pytest.mark.django_db + def test_write_playlist(self, playlist_source): + playlist = ["/tmp/a", "/tmp/b"] + playlist_source.write_playlist(playlist) + with open(playlist_source.path, "r") as file: + result = file.read() + os.remove(playlist_source.path) + + assert result == "\n".join(playlist) + + @pytest.mark.django_db + def test_stream(self, playlist_source, stream): + result = playlist_source.stream() + assert result == { + "begin": stream.begin.strftime("%Hh%M"), + "end": stream.end.strftime("%Hh%M"), + "delay": 0, + } + + @pytest.mark.django_db + def test_sync(self, playlist_source): + # spoof method + playlist = ["/tmp/a", "/tmp/b"] + written_playlist = [] + playlist_source.get_playlist = lambda: playlist + playlist_source.write_playlist = lambda p: written_playlist.extend(p) + + playlist_source.sync() + assert written_playlist == playlist + + +class TestQueueSource: + @pytest.mark.django_db + def test_requests(self, queue_source, socket, metadata_string): + queue_source.queue = [13, 14, 15] + socket.recv_data = [ + f"{metadata_string}\nEND" for _ in queue_source.queue + ] + + requests = queue_source.requests + + assert all(isinstance(r, Request) for r in requests) + assert all(r.uri for r in requests) + + @pytest.mark.django_db + def test_push(self, queue_source, socket): + paths = ["/tmp/a", "/tmp/b"] + queue_source.push(*paths) + assert all( + socket.is_sent(f"{queue_source.id}_queue.push {path}") + for path in paths + ) + + @pytest.mark.django_db + def test_fetch(self, queue_source, socket, metadata_string): + queue = ["13", "14", "15"] + socket.recv_data = [ + # Source fetch remaining & metadata + "13 END", + metadata_string, + " ".join(queue) + "\nEND", + ] + queue_source.fetch() + assert queue_source.uri + assert queue_source.queue == queue diff --git a/aircox_streamer/tests/test_controllers_streamer.py b/aircox_streamer/tests/test_controllers_streamer.py new file mode 100644 index 0000000..ea828f1 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_streamer.py @@ -0,0 +1,150 @@ +import os + +import pytest + +from aircox_streamer import controllers +from . import fake_modules +from .fake_modules import atexit, subprocess, psutil + + +class FakeSource: + synced = False + + def sync(self): + self.synced = True + + +@pytest.fixture +def streamer(station, connector, station_ports, stream): + fake_modules.setup() + streamer = controllers.Streamer(station, connector) + psutil.net_connections.result = [ + psutil.FakeNetConnection(streamer.socket_path, None), + ] + yield streamer + fake_modules.setdown() + + +class TestStreamer: + @pytest.mark.django_db + def test_socket_path(self, streamer): + assert streamer.socket_path == streamer.connector.address + + @pytest.mark.django_db + def test_is_ready(self, streamer, socket): + socket.recv_data = "item 1\nEND" + assert streamer.is_ready + + @pytest.mark.django_db + def test_is_ready_false(self, streamer, socket): + socket.recv_data = "" + assert not streamer.is_ready + + @pytest.mark.django_db + def test_is_running(self, streamer): + streamer.process = subprocess.FakeProcess() + streamer.process.poll_result = None + assert streamer.is_running + + @pytest.mark.django_db + def test_is_running_no_process(self, streamer): + streamer.process = None + assert not streamer.is_running + + @pytest.mark.django_db + def test_is_running_process_died(self, streamer): + process = subprocess.FakeProcess() + process.poll_result = 1 + streamer.process = process + assert not streamer.is_running + assert streamer.process is None + assert process.polled + + @pytest.mark.django_db + def test_playlists(self, streamer, program): + result = list(streamer.playlists) + assert len(result) == 1 + + result = result[0] + assert isinstance(result, controllers.PlaylistSource) + assert result.program == program + + @pytest.mark.django_db + def test_queues(self, streamer): + result = list(streamer.queues) + assert len(result) == 1 + assert result[0] == streamer.dealer + + @pytest.mark.django_db + def test_init_sources(self, streamer, program): + streamer.init_sources() + assert isinstance(streamer.dealer, controllers.QueueSource) + # one for dealer, one for program + assert len(streamer.sources) == 2 + assert streamer.sources[1].program == program + + @pytest.mark.django_db + def test_make_config(self, streamer): + streamer.make_config() + assert os.path.exists(streamer.path) + + @pytest.mark.django_db + def test_sync(self, streamer): + streamer.sources = [FakeSource(), FakeSource()] + streamer.sync() + assert all(source.synced for source in streamer.sources) + + @pytest.mark.django_db + def test_fetch(self, streamer): + pass + + @pytest.mark.django_db + def test_get_process_args(self, streamer): + assert streamer.get_process_args() == [ + "liquidsoap", + "-v", + streamer.path, + ] + + @pytest.mark.django_db + def test_check_zombie_process(self, streamer): + with open(streamer.socket_path, "w+") as file: + file.write("data") + # This test is incomplete, but we can not go further because os module + # is not spoofed (too much work) to check if os.kill is called. + streamer.check_zombie_process() + + @pytest.mark.django_db + def test_check_zombie_process_no_socket(self, streamer): + if os.path.exists(streamer.socket_path): + os.remove(streamer.socket_path) + streamer.check_zombie_process() + + @pytest.mark.django_db + def test_run_process(self, streamer): + if os.path.exists(streamer.socket_path): + os.remove(streamer.socket_path) + streamer.run_process() + process = streamer.process + + assert process.args == streamer.get_process_args() + assert streamer.kill_process in atexit.registered + + @pytest.mark.django_db + def test_kill_process(self, streamer): + streamer.run_process() + process = streamer.process + streamer.kill_process() + + assert process.killed + assert streamer.process is None + assert streamer.kill_process not in atexit.registered + + @pytest.mark.django_db + def test_wait_process(self, streamer): + process = subprocess.FakeProcess() + streamer.process = process + streamer.wait_process() + + assert process.waited + assert streamer.process is None diff --git a/aircox_streamer/tests/test_controllers_streamers.py b/aircox_streamer/tests/test_controllers_streamers.py new file mode 100644 index 0000000..b488043 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_streamers.py @@ -0,0 +1,37 @@ +from datetime import timedelta + +from django.utils import timezone as tz +import pytest + + +class TestStreamers: + @pytest.fixture + def test___init__(self, streamers): + assert isinstance(streamers.timeout, timedelta) + + @pytest.fixture + def test_reset(self, streamers, stations): + streamers.reset() + assert all( + streamers.streamers[station.pk] == station for station in stations + ) + + @pytest.fixture + def test_fetch(self, streamers): + streamers.next_date = tz.now() - tz.timedelta(seconds=30) + streamers.streamers = None + + now = tz.now() + streamers.fetch() + + assert all(streamer.calls.get("fetch") for streamer in streamers) + assert streamers.next_date > now + + @pytest.fixture + def test_fetch_timeout_not_reached(self, streamers): + next_date = tz.now() + tz.timedelta(seconds=30) + streamers.next_date = next_date + + streamers.fetch() + assert all(not streamer.calls.get("fetch") for streamer in streamers) + assert streamers.next_date == next_date diff --git a/aircox_streamer/tests/test_viewsets.py b/aircox_streamer/tests/test_viewsets.py new file mode 100644 index 0000000..5493aaa --- /dev/null +++ b/aircox_streamer/tests/test_viewsets.py @@ -0,0 +1,185 @@ +import pytest + +from django.http import Http404 + +from rest_framework.exceptions import ValidationError +from aircox_streamer.viewsets import ( + ControllerViewSet, + SourceViewSet, + StreamerViewSet, + QueueSourceViewSet, +) + + +class FakeSerializer: + def __init__(self, instance, *args, **kwargs): + self.instance = instance + self.data = {"instance": self.instance} + self.args = args + self.kwargs = kwargs + + +class FakeRequest: + def __init__(self, **kwargs): + self.__dict__.update(**kwargs) + + +@pytest.fixture +def controller_viewset(streamers, station): + return ControllerViewSet( + streamers=streamers, + streamer=streamers[station.pk], + serializer_class=FakeSerializer, + ) + + +@pytest.fixture +def streamer_viewset(streamers, station): + return StreamerViewSet( + streamers=streamers, + streamer=streamers[station.pk], + serializer_class=FakeSerializer, + ) + + +@pytest.fixture +def source_viewset(streamers, station): + return SourceViewSet( + streamers=streamers, + streamer=streamers[station.pk], + serializer_class=FakeSerializer, + ) + + +@pytest.fixture +def queue_source_viewset(streamers, station): + return QueueSourceViewSet( + streamers=streamers, + streamer=streamers[station.pk], + serializer_class=FakeSerializer, + ) + + +class TestControllerViewSet: + @pytest.mark.django_db + def test_get_streamer(self, controller_viewset, stations): + station = stations[0] + streamer = controller_viewset.get_streamer(station.pk) + assert streamer.station.pk == station.pk + assert streamer.calls.get("fetch") + + @pytest.mark.django_db + def test_get_streamer_station_not_found(self, controller_viewset): + controller_viewset.streamers.streamers = {} + with pytest.raises(Http404): + controller_viewset.get_streamer(1) + + @pytest.mark.django_db + def test_get_serializer(self, controller_viewset): + controller_viewset.object = {"object": "value"} + serializer = controller_viewset.get_serializer(test=True) + assert serializer.kwargs["test"] + assert serializer.instance == controller_viewset.object + + @pytest.mark.django_db + def test_serialize(self, controller_viewset): + instance = {} + data = controller_viewset.serialize(instance, test=True) + assert data == {"instance": instance} + + +class TestStreamerViewSet: + @pytest.mark.django_db + def test_retrieve(self, streamer_viewset): + streamer_viewset.streamer = {"streamer": "test"} + resp = streamer_viewset.retrieve(None, None) + assert resp.data == {"instance": streamer_viewset.streamer} + + @pytest.mark.django_db + def test_list(self, streamer_viewset): + streamers = {"a": 1, "b": 2} + streamer_viewset.streamers.streamers = streamers + resp = streamer_viewset.list(None) + assert set(resp.data["results"]["instance"]) == set(streamers.values()) + + +class TestSourceViewSet: + @pytest.mark.django_db + def test_get_sources(self, source_viewset, streamers): + source_viewset.streamer.sources.append(45) + sources = source_viewset.get_sources() + assert 45 not in set(sources) + + @pytest.mark.django_db + def test_get_source(self, source_viewset): + source = source_viewset.get_source(1) + assert source.id == 1 + + @pytest.mark.django_db + def test_get_source_not_found(self, source_viewset): + with pytest.raises(Http404): + source_viewset.get_source(1000) + + @pytest.mark.django_db + def test_retrieve(self, source_viewset, station): + resp = source_viewset.retrieve(None, 0) + source = source_viewset.streamers[station.pk].sources[0] + # this is FakeSerializer being used which provides us the proof + assert resp.data["instance"] == source + + @pytest.mark.django_db + def test_list(self, source_viewset, station): + sources = source_viewset.streamers[station.pk].sources + resp = source_viewset.list(None) + assert list(resp.data["results"]["instance"]) == sources + + @pytest.mark.django_db + def test__run(self, source_viewset): + calls = {} + + def action(x): + return calls.setdefault("action", True) + + source_viewset._run(0, action) + assert calls.get("action") + + @pytest.mark.django_db + def test_all_api_source_actions(self, source_viewset, station): + source = source_viewset.streamers[station.pk].sources[0] + request = FakeRequest(POST={"seek": 1}) + source_viewset.get_source = lambda x: source + + for action in ("sync", "skip", "restart", "seek"): + func = getattr(source_viewset, action) + func(request, 1) + assert source.calls.get(action) + + +class TestQueueSourceViewSet: + @pytest.mark.django_db + def test_get_sound_queryset(self, queue_source_viewset, station, sounds): + ids = {sound.pk for sound in sounds} + request = FakeRequest(station=station) + query = queue_source_viewset.get_sound_queryset(request) + assert set(query.values_list("pk", flat=True)) == ids + + @pytest.mark.django_db + def test_push(self, queue_source_viewset, station, sounds): + calls = {} + sound = sounds[0] + request = FakeRequest(station=station, data={"sound_id": sound.pk}) + queue_source_viewset._run = lambda pk, func: calls.setdefault( + "_run", (pk, func) + ) + result = queue_source_viewset.push(request, 13) + assert "_run" in calls + assert result[0] == 13 + assert callable(result[1]) + + @pytest.mark.django_db + def test_push_missing_sound_in_request_post( + self, queue_source_viewset, station + ): + request = FakeRequest(station=station, data={}) + with pytest.raises(ValidationError): + queue_source_viewset.push(request, 0) diff --git a/aircox_streamer/tests/working_dir/keepme.txt b/aircox_streamer/tests/working_dir/keepme.txt new file mode 100644 index 0000000..e69de29 diff --git a/aircox_streamer/urls.py b/aircox_streamer/urls.py index 2a159b6..fdd86ec 100644 --- a/aircox_streamer/urls.py +++ b/aircox_streamer/urls.py @@ -4,11 +4,11 @@ from django.utils.translation import gettext_lazy as _ from aircox.viewsets import SoundViewSet from . import viewsets -from .views import StreamerAdminMixin +from .views import StreamerAdminView admin.site.route_view( "tools/streamer", - StreamerAdminMixin.as_view(), + StreamerAdminView.as_view(), "tools-streamer", label=_("Streamer Monitor"), ) diff --git a/aircox_streamer/views.py b/aircox_streamer/views.py index 03df665..43832e0 100644 --- a/aircox_streamer/views.py +++ b/aircox_streamer/views.py @@ -2,8 +2,17 @@ from django.utils.translation import gettext_lazy as _ from django.views.generic import TemplateView from aircox.views.admin import AdminMixin +from .controllers import streamers -class StreamerAdminMixin(AdminMixin, TemplateView): +class StreamerAdminView(AdminMixin, TemplateView): template_name = "aircox_streamer/streamer.html" title = _("Streamer Monitor") + streamers = streamers + + def dispatch(self, *args, **kwargs): + # Note: this might raise concurrency racing problem with viewsets, + # since streamers.streamers is reset to a new dict. Still am i not + # sure, and needs analysis. + self.streamers.reset() + return super().dispatch(*args, **kwargs) diff --git a/aircox_streamer/viewsets.py b/aircox_streamer/viewsets.py index 1a11be9..bd63567 100644 --- a/aircox_streamer/viewsets.py +++ b/aircox_streamer/viewsets.py @@ -1,15 +1,15 @@ from django.http import Http404 from django.shortcuts import get_object_or_404 -from django.utils import timezone as tz from rest_framework import viewsets from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from rest_framework.permissions import IsAdminUser from rest_framework.response import Response -from aircox.models import Sound, Station +from aircox.models import Sound from . import controllers + from .serializers import ( PlaylistSerializer, QueueSourceSerializer, @@ -19,8 +19,7 @@ from .serializers import ( ) __all__ = [ - "Streamers", - "BaseControllerAPIView", + "ControllerViewSet", "RequestViewSet", "StreamerViewSet", "SourceViewSet", @@ -29,94 +28,45 @@ __all__ = [ ] -class Streamers: - date = None - """Next update datetime.""" - streamers = None - """Stations by station id.""" - timeout = None - """Timedelta to next update.""" - - def __init__(self, timeout=None): - self.timeout = timeout or tz.timedelta(seconds=2) - - def load(self, force=False): - # FIXME: cf. TODO in aircox.controllers about model updates - stations = Station.objects.active() - if self.streamers is None or force: - self.streamers = { - station.pk: controllers.Streamer(station) - for station in stations - } - return - - streamers = self.streamers - self.streamers = { - station.pk: controllers.Streamer(station) - if station.pk in streamers - else streamers[station.pk] - for station in stations - } - - def fetch(self): - if self.streamers is None: - self.load() - - now = tz.now() - if self.date is not None and now < self.date: - return - - for streamer in self.streamers.values(): - streamer.fetch() - self.date = now + self.timeout - - def get(self, key, default=None): - return self.streamers.get(key, default) - - def values(self): - return self.streamers.values() - - def __getitem__(self, key): - return self.streamers[key] - - def __contains__(self, key): - return key in self.streamers - - -streamers = Streamers() - - -class BaseControllerAPIView(viewsets.ViewSet): +class ControllerViewSet(viewsets.ViewSet): permission_classes = (IsAdminUser,) serializer_class = None + streamers = controllers.streamers + """Streamers controller instance.""" streamer = None + """User's Streamer instance.""" object = None + """Object to serialize.""" - def get_streamer(self, request, station_pk=None, **kwargs): - streamers.fetch() - id = int(request.station.pk if station_pk is None else station_pk) - if id not in streamers: + def get_streamer(self, station_pk=None): + """Get user's streamer.""" + if station_pk is None: + station_pk = self.request.station.pk + self.streamers.fetch() + if station_pk not in self.streamers: raise Http404("station not found") - return streamers[id] + return self.streamers[station_pk] def get_serializer(self, **kwargs): + """Get serializer instance.""" return self.serializer_class(self.object, **kwargs) def serialize(self, obj, **kwargs): + """Serializer controller data.""" self.object = obj serializer = self.get_serializer(**kwargs) return serializer.data def dispatch(self, request, *args, station_pk=None, **kwargs): - self.streamer = self.get_streamer(request, station_pk, **kwargs) + self.streamer = self.get_streamer(station_pk) return super().dispatch(request, *args, **kwargs) -class RequestViewSet(BaseControllerAPIView): +class RequestViewSet(ControllerViewSet): serializer_class = RequestSerializer -class StreamerViewSet(BaseControllerAPIView): +class StreamerViewSet(ControllerViewSet): serializer_class = StreamerSerializer def retrieve(self, request, pk=None): @@ -124,7 +74,7 @@ class StreamerViewSet(BaseControllerAPIView): def list(self, request, pk=None): return Response( - {"results": self.serialize(streamers.values(), many=True)} + {"results": self.serialize(self.streamers.values(), many=True)} ) def dispatch(self, request, *args, pk=None, **kwargs): @@ -135,7 +85,7 @@ class StreamerViewSet(BaseControllerAPIView): return super().dispatch(request, *args, **kwargs) -class SourceViewSet(BaseControllerAPIView): +class SourceViewSet(ControllerViewSet): serializer_class = SourceSerializer model = controllers.Source @@ -151,8 +101,8 @@ class SourceViewSet(BaseControllerAPIView): return source def retrieve(self, request, pk=None): - self.object = self.get_source(pk) - return Response(self.serialize()) + source = self.get_source(pk) + return Response(self.serialize(source)) def list(self, request): return Response( @@ -192,8 +142,8 @@ class QueueSourceViewSet(SourceViewSet): serializer_class = QueueSourceSerializer model = controllers.QueueSource - def get_sound_queryset(self): - return Sound.objects.station(self.request.station).archive() + def get_sound_queryset(self, request): + return Sound.objects.station(request.station).archive() @action(detail=True, methods=["POST"]) def push(self, request, pk): @@ -201,7 +151,7 @@ class QueueSourceViewSet(SourceViewSet): raise ValidationError('missing "sound_id" POST data') sound = get_object_or_404( - self.get_sound_queryset(), pk=request.data["sound_id"] + self.get_sound_queryset(request), pk=request.data["sound_id"] ) return self._run( pk, lambda s: s.push(sound.file.path) if sound.file.path else None