From 95f5dca683cd1efbe980400ee85b556b68a9fb71 Mon Sep 17 00:00:00 2001 From: bkfox Date: Sat, 17 Jun 2023 23:57:58 +0200 Subject: [PATCH] add missing files + move Monitor class from commands to controllers. --- aircox_streamer/controllers/__init__.py | 3 + aircox_streamer/controllers/monitor.py | 278 ++++++++++++++++++ aircox_streamer/controllers/streamers.py | 62 ++++ .../management/commands/streamer.py | 275 +---------------- 4 files changed, 345 insertions(+), 273 deletions(-) create mode 100644 aircox_streamer/controllers/monitor.py create mode 100644 aircox_streamer/controllers/streamers.py diff --git a/aircox_streamer/controllers/__init__.py b/aircox_streamer/controllers/__init__.py index 8b0bd03..11598d5 100644 --- a/aircox_streamer/controllers/__init__.py +++ b/aircox_streamer/controllers/__init__.py @@ -5,6 +5,7 @@ from .metadata import Metadata, Request from .streamer import Streamer from .streamers import Streamers from .sources import Source, PlaylistSource, QueueSource +from .monitor import Monitor streamers = Streamers() @@ -19,4 +20,6 @@ __all__ = ( "Source", "PlaylistSource", "QueueSource", + "Monitor", + "streamers", ) diff --git a/aircox_streamer/controllers/monitor.py b/aircox_streamer/controllers/monitor.py new file mode 100644 index 0000000..ed37e63 --- /dev/null +++ b/aircox_streamer/controllers/monitor.py @@ -0,0 +1,278 @@ +# TODO: +# x controllers: remaining +# x diffusion conflicts +# x cancel +# x when liquidsoap fails to start/exists: exit +# - handle restart after failure +# - is stream restart after live ok? +import pytz +from django.utils import timezone as tz + +from aircox.models import Diffusion, Log, Sound, Track + +# force using UTC +tz.activate(pytz.UTC) + + +class Monitor: + """Log and launch diffusions for the given station. + + Monitor should be able to be used after a crash a go back + where it was playing, so we heavily use logs to be able to + do that. + + We keep trace of played items on the generated stream: + - sounds played on this stream; + - scheduled diffusions + - tracks for sounds of streamed programs + """ + + streamer = None + """Streamer controller.""" + delay = None + """ Timedelta: minimal delay between two call of monitor. """ + logs = None + """Queryset to station's logs (ordered by -pk)""" + cancel_timeout = 20 + """Timeout in minutes before cancelling a diffusion.""" + sync_timeout = 5 + """Timeout in minutes between two streamer's sync.""" + sync_next = None + """Datetime of the next sync.""" + last_sound_logs = None + """Last logged sounds, as ``{source_id: log}``.""" + + @property + def station(self): + return self.streamer.station + + @property + def last_log(self): + """Last log of monitored station.""" + return self.logs.first() + + @property + def last_diff_start(self): + """Log of last triggered item (sound or diffusion).""" + return self.logs.start().with_diff().first() + + def __init__(self, streamer, delay, cancel_timeout, **kwargs): + self.streamer = streamer + # adding time ensure all calculation have a margin + self.delay = delay + tz.timedelta(seconds=5) + self.cancel_timeout = cancel_timeout + self.__dict__.update(kwargs) + self.logs = self.get_logs_queryset() + self.init_last_sound_logs() + + def get_logs_queryset(self): + """Return queryset to assign as `self.logs`""" + return self.station.log_set.select_related( + "diffusion", "sound", "track" + ).order_by("-pk") + + def init_last_sound_logs(self, key=None): + """Retrieve last logs and initialize `last_sound_logs`""" + logs = {} + for source in self.streamer.sources: + qs = self.logs.filter(source=source.id, sound__isnull=False) + logs[source.id] = qs.first() + self.last_sound_logs = logs + + def monitor(self): + """Run all monitoring functions once.""" + if not self.streamer.is_ready: + return + + self.streamer.fetch() + + # Skip tracing - analyzis: + # Reason: multiple database request every x seconds, reducing it. + # We could skip this part when remaining time is higher than a minimal + # value (which should be derived from Command's delay). Problems: + # - How to trace tracks? (+ Source can change: caching log might sucks) + # - if liquidsoap's source/request changes: remaining time goes higher, + # thus avoiding fetch + # + # Approach: something like having a mean time, such as: + # + # ``` + # source = stream.source + # mean_time = source.air_time + # + min(next_track.timestamp, source.remaining) + # - (command.delay + 1) + # trace_required = \/ source' != source + # \/ source.uri' != source.uri + # \/ now < mean_time + # ``` + # + source = self.streamer.source + if source and source.uri: + log = self.trace_sound(source) + if log: + self.trace_tracks(log) + else: + print("no source or sound for stream; source = ", source) + + self.handle_diffusions() + self.sync() + + def log(self, source, **kwargs): + """Create a log using **kwargs, and print info.""" + kwargs.setdefault("station", self.station) + kwargs.setdefault("date", tz.now()) + log = Log(source=source, **kwargs) + log.save() + log.print() + + if log.sound: + self.last_sound_logs[source] = log + return log + + def trace_sound(self, source): + """Return on air sound log (create if not present).""" + air_uri, air_time = source.uri, source.air_time + last_log = self.last_sound_logs.get(source.id) + if last_log and last_log.sound.file.path == source.uri: + return last_log + + # FIXME: can be a sound played when no Sound instance? If not, remove + # comment. + # check if there is yet a log for this sound on the source + # log = self.logs.on_air().filter( + # Q(sound__file=air_uri) | + # # sound can be null when arbitrary sound file is played + # Q(sound__isnull=True, track__isnull=True, comment=air_uri), + # source=source.id, + # date__range=date_range(air_time, self.delay), + # ).first() + # if log: + # return log + + # get sound + diff = None + sound = Sound.objects.path(air_uri).first() + if sound and sound.episode_id is not None: + diff = ( + Diffusion.objects.episode(id=sound.episode_id) + .on_air() + .now(air_time) + .first() + ) + + # log sound on air + return self.log( + type=Log.TYPE_ON_AIR, + date=source.air_time, + source=source.id, + sound=sound, + diffusion=diff, + comment=air_uri, + ) + + def trace_tracks(self, log): + """Log tracks for the given sound log (for streamed programs only).""" + if log.diffusion: + return + + tracks = Track.objects.filter( + sound__id=log.sound_id, timestamp__isnull=False + ).order_by("timestamp") + if not tracks.exists(): + return + + # exclude already logged tracks + tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk) + now = tz.now() + for track in tracks: + pos = log.date + tz.timedelta(seconds=track.timestamp) + if pos > now: + break + self.log( + type=Log.TYPE_ON_AIR, + date=pos, + source=log.source, + track=track, + comment=track, + ) + + def handle_diffusions(self): + """Handle scheduled diffusion, trigger if needed, preload playlists and + so on.""" + # TODO: program restart + + # Diffusion conflicts are handled by the way a diffusion is defined + # as candidate for the next dealer's start. + # + # ``` + # logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START + # /\ log.diff = diff + # /\ log.date = diff.start + # queue_empty: /\ dealer.queue is empty + # /\ \/ ~dealer.on_air + # \/ dealer.remaining < delay + # + # start_allowed: /\ diff not in logged_diff + # /\ queue_empty + # + # start_canceled: /\ diff not in logged diff + # /\ ~queue_empty + # /\ diff.start < now + cancel_timeout + # ``` + # + now = tz.now() + diff = ( + Diffusion.objects.station(self.station) + .on_air() + .now(now) + .filter(episode__sound__type=Sound.TYPE_ARCHIVE) + .first() + ) + # Can't use delay: diffusion may start later than its assigned start. + log = None if not diff else self.logs.start().filter(diffusion=diff) + if not diff or log: + return + + dealer = self.streamer.dealer + # start + if ( + not dealer.queue + and dealer.rid is None + or dealer.remaining < self.delay.total_seconds() + ): + self.start_diff(dealer, diff) + + # cancel + if diff.start < now - self.cancel_timeout: + self.cancel_diff(dealer, diff) + + def start_diff(self, source, diff): + playlist = Sound.objects.episode(id=diff.episode_id).playlist() + source.push(*playlist) + self.log( + type=Log.TYPE_START, + source=source.id, + diffusion=diff, + comment=str(diff), + ) + + def cancel_diff(self, source, diff): + diff.type = Diffusion.TYPE_CANCEL + diff.save() + self.log( + type=Log.TYPE_CANCEL, + source=source.id, + diffusion=diff, + comment=str(diff), + ) + + def sync(self): + """Update sources' playlists.""" + now = tz.now() + if self.sync_next is not None and now < self.sync_next: + return + + self.sync_next = now + tz.timedelta(minutes=self.sync_timeout) + + for source in self.streamer.playlists: + source.sync() diff --git a/aircox_streamer/controllers/streamers.py b/aircox_streamer/controllers/streamers.py new file mode 100644 index 0000000..3775f9a --- /dev/null +++ b/aircox_streamer/controllers/streamers.py @@ -0,0 +1,62 @@ +from django.utils import timezone as tz + +from aircox.models import Station +from .streamer import Streamer + +__all__ = ("Streamers",) + + +class Streamers: + """Keep multiple streamers in memory, allow fetching informations.""" + + streamers = None + """Stations by station id.""" + streamer_class = Streamer + timeout = None + """Timedelta to next update.""" + next_date = None + """Next update datetime.""" + + def __init__(self, timeout=None, streamer_class=streamer_class): + self.timeout = timeout or tz.timedelta(seconds=2) + + def reset(self, stations=Station.objects.active()): + # FIXME: cf. TODO in aircox.controllers about model updates + stations = stations.all() + self.streamers = { + station.pk: self.streamer_class(station) for station in stations + } + + def fetch(self): + """Call streamers fetch if timed-out.""" + if self.streamers is None: + self.reset() + + now = tz.now() + if self.next_date is not None and now < self.next_date: + return + + for streamer in self.streamers.values(): + streamer.fetch() + self.next_date = now + self.timeout + + def get(self, key, default=None): + return self.streamers.get(key, default) + + def values(self): + return self.streamers.values() + + def __len__(self): + return self.streamers and len(self.streamers) or 0 + + def __getitem__(self, key): + return self.streamers[key] + + def __contains__(self, key): + """Key can be a Station or a Station id.""" + if isinstance(key, Station): + return key.pk in self.streamers + return key in self.streamers + + def __iter__(self): + return self.streamers.values() if self.streamers else iter(tuple()) diff --git a/aircox_streamer/management/commands/streamer.py b/aircox_streamer/management/commands/streamer.py index d94235c..9908fd8 100755 --- a/aircox_streamer/management/commands/streamer.py +++ b/aircox_streamer/management/commands/streamer.py @@ -8,290 +8,19 @@ to: """ import time -# TODO: -# x controllers: remaining -# x diffusion conflicts -# x cancel -# x when liquidsoap fails to start/exists: exit -# - handle restart after failure -# - is stream restart after live ok? from argparse import RawTextHelpFormatter import pytz from django.core.management.base import BaseCommand from django.utils import timezone as tz -from aircox.models import Diffusion, Log, Sound, Station, Track -from aircox_streamer.controllers import Streamer +from aircox.models import Station +from aircox_streamer.controllers import Monitor, Streamer # force using UTC tz.activate(pytz.UTC) -class Monitor: - """Log and launch diffusions for the given station. - - Monitor should be able to be used after a crash a go back - where it was playing, so we heavily use logs to be able to - do that. - - We keep trace of played items on the generated stream: - - sounds played on this stream; - - scheduled diffusions - - tracks for sounds of streamed programs - """ - - streamer = None - """Streamer controller.""" - delay = None - """ Timedelta: minimal delay between two call of monitor. """ - logs = None - """Queryset to station's logs (ordered by -pk)""" - cancel_timeout = 20 - """Timeout in minutes before cancelling a diffusion.""" - sync_timeout = 5 - """Timeout in minutes between two streamer's sync.""" - sync_next = None - """Datetime of the next sync.""" - last_sound_logs = None - """Last logged sounds, as ``{source_id: log}``.""" - - @property - def station(self): - return self.streamer.station - - @property - def last_log(self): - """Last log of monitored station.""" - return self.logs.first() - - @property - def last_diff_start(self): - """Log of last triggered item (sound or diffusion).""" - return self.logs.start().with_diff().first() - - def __init__(self, streamer, delay, cancel_timeout, **kwargs): - self.streamer = streamer - # adding time ensure all calculation have a margin - self.delay = delay + tz.timedelta(seconds=5) - self.cancel_timeout = cancel_timeout - self.__dict__.update(kwargs) - self.logs = self.get_logs_queryset() - self.init_last_sound_logs() - - def get_logs_queryset(self): - """Return queryset to assign as `self.logs`""" - return self.station.log_set.select_related( - "diffusion", "sound", "track" - ).order_by("-pk") - - def init_last_sound_logs(self, key=None): - """Retrieve last logs and initialize `last_sound_logs`""" - logs = {} - for source in self.streamer.sources: - qs = self.logs.filter(source=source.id, sound__isnull=False) - logs[source.id] = qs.first() - self.last_sound_logs = logs - - def monitor(self): - """Run all monitoring functions once.""" - if not self.streamer.is_ready: - return - - self.streamer.fetch() - - # Skip tracing - analyzis: - # Reason: multiple database request every x seconds, reducing it. - # We could skip this part when remaining time is higher than a minimal - # value (which should be derived from Command's delay). Problems: - # - How to trace tracks? (+ Source can change: caching log might sucks) - # - if liquidsoap's source/request changes: remaining time goes higher, - # thus avoiding fetch - # - # Approach: something like having a mean time, such as: - # - # ``` - # source = stream.source - # mean_time = source.air_time - # + min(next_track.timestamp, source.remaining) - # - (command.delay + 1) - # trace_required = \/ source' != source - # \/ source.uri' != source.uri - # \/ now < mean_time - # ``` - # - source = self.streamer.source - if source and source.uri: - log = self.trace_sound(source) - if log: - self.trace_tracks(log) - else: - print("no source or sound for stream; source = ", source) - - self.handle_diffusions() - self.sync() - - def log(self, source, **kwargs): - """Create a log using **kwargs, and print info.""" - kwargs.setdefault("station", self.station) - kwargs.setdefault("date", tz.now()) - log = Log(source=source, **kwargs) - log.save() - log.print() - - if log.sound: - self.last_sound_logs[source] = log - return log - - def trace_sound(self, source): - """Return on air sound log (create if not present).""" - air_uri, air_time = source.uri, source.air_time - last_log = self.last_sound_logs.get(source.id) - if last_log and last_log.sound.file.path == source.uri: - return last_log - - # FIXME: can be a sound played when no Sound instance? If not, remove - # comment. - # check if there is yet a log for this sound on the source - # log = self.logs.on_air().filter( - # Q(sound__file=air_uri) | - # # sound can be null when arbitrary sound file is played - # Q(sound__isnull=True, track__isnull=True, comment=air_uri), - # source=source.id, - # date__range=date_range(air_time, self.delay), - # ).first() - # if log: - # return log - - # get sound - diff = None - sound = Sound.objects.path(air_uri).first() - if sound and sound.episode_id is not None: - diff = ( - Diffusion.objects.episode(id=sound.episode_id) - .on_air() - .now(air_time) - .first() - ) - - # log sound on air - return self.log( - type=Log.TYPE_ON_AIR, - date=source.air_time, - source=source.id, - sound=sound, - diffusion=diff, - comment=air_uri, - ) - - def trace_tracks(self, log): - """Log tracks for the given sound log (for streamed programs only).""" - if log.diffusion: - return - - tracks = Track.objects.filter( - sound__id=log.sound_id, timestamp__isnull=False - ).order_by("timestamp") - if not tracks.exists(): - return - - # exclude already logged tracks - tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk) - now = tz.now() - for track in tracks: - pos = log.date + tz.timedelta(seconds=track.timestamp) - if pos > now: - break - self.log( - type=Log.TYPE_ON_AIR, - date=pos, - source=log.source, - track=track, - comment=track, - ) - - def handle_diffusions(self): - """Handle scheduled diffusion, trigger if needed, preload playlists and - so on.""" - # TODO: program restart - - # Diffusion conflicts are handled by the way a diffusion is defined - # as candidate for the next dealer's start. - # - # ``` - # logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START - # /\ log.diff = diff - # /\ log.date = diff.start - # queue_empty: /\ dealer.queue is empty - # /\ \/ ~dealer.on_air - # \/ dealer.remaining < delay - # - # start_allowed: /\ diff not in logged_diff - # /\ queue_empty - # - # start_canceled: /\ diff not in logged diff - # /\ ~queue_empty - # /\ diff.start < now + cancel_timeout - # ``` - # - now = tz.now() - diff = ( - Diffusion.objects.station(self.station) - .on_air() - .now(now) - .filter(episode__sound__type=Sound.TYPE_ARCHIVE) - .first() - ) - # Can't use delay: diffusion may start later than its assigned start. - log = None if not diff else self.logs.start().filter(diffusion=diff) - if not diff or log: - return - - dealer = self.streamer.dealer - # start - if ( - not dealer.queue - and dealer.rid is None - or dealer.remaining < self.delay.total_seconds() - ): - self.start_diff(dealer, diff) - - # cancel - if diff.start < now - self.cancel_timeout: - self.cancel_diff(dealer, diff) - - def start_diff(self, source, diff): - playlist = Sound.objects.episode(id=diff.episode_id).playlist() - source.push(*playlist) - self.log( - type=Log.TYPE_START, - source=source.id, - diffusion=diff, - comment=str(diff), - ) - - def cancel_diff(self, source, diff): - diff.type = Diffusion.TYPE_CANCEL - diff.save() - self.log( - type=Log.TYPE_CANCEL, - source=source.id, - diffusion=diff, - comment=str(diff), - ) - - def sync(self): - """Update sources' playlists.""" - now = tz.now() - if self.sync_next is not None and now < self.sync_next: - return - - self.sync_next = now + tz.timedelta(minutes=self.sync_timeout) - - for source in self.streamer.playlists: - source.sync() - - class Command(BaseCommand): help = __doc__