""" Handle the audio streamer and controls it as we want it to be. It is used to: - generate config files and playlists; - monitor Liquidsoap, logs and scheduled programs; - cancels Diffusions that have an archive but could not have been played; - run Liquidsoap """ import tzlocal import time import re from argparse import RawTextHelpFormatter from django.conf import settings as main_settings from django.core.management.base import BaseCommand, CommandError from django.utils import timezone as tz from django.utils.functional import cached_property from django.db import models from django.db.models import Q from aircox.models import Station, Diffusion, Track, Sound, Log # force using UTC import pytz tz.activate(pytz.UTC) # FIXME liquidsoap does not manage timezones -- we have to convert # 'on_air' metadata we get from it into utc one in order to work # correctly. local_tz = tzlocal.get_localzone() class Monitor: """ Log and launch diffusions for the given station. Monitor should be able to be used after a crash a go back where it was playing, so we heavily use logs to be able to do that. We keep trace of played items on the generated stream: - sounds played on this stream; - scheduled diffusions - tracks for sounds of streamed programs """ station = None streamer = None cancel_timeout = 60*10 """ Time in seconds before a diffusion that have archives is cancelled because it has not been played. """ sync_timeout = 60*10 """ Time in minuts before all stream playlists are checked and updated """ sync_next = None """ Datetime of the next sync """ def get_last_log(self, *args, **kwargs): return self.log_qs.filter(*args, **kwargs).last() @property def log_qs(self): return Log.objects.station(self.station) \ .select_related('diffusion', 'sound') \ .order_by('pk') @property def last_log(self): """ Last log of monitored station """ return self.log_qs.last() @property def last_sound(self): """ Last sound log of monitored station that occurred on_air """ return self.get_last_log(type=Log.Type.on_air, sound__isnull=False) @property def last_diff_start(self): """ Log of last triggered item (sound or diffusion) """ return self.get_last_log(type=Log.Type.start, diffusion__isnull=False) def __init__(self, station, **kwargs): self.station = station self.__dict__.update(kwargs) def monitor(self): """ Run all monitoring functions. """ if not self.streamer: self.streamer = self.station.streamer if not self.streamer.ready(): return self.streamer.fetch() source = self.streamer.source if source and source.sound: log = self.trace_sound(source) if log: self.trace_tracks(log) else: print('no source or sound for stream; source = ', source) self.sync_playlists() self.handle() def log(self, date=None, **kwargs): """ Create a log using **kwargs, and print info """ log = Log(station=self.station, date=date or tz.now(), **kwargs) if log.type == Log.Type.on_air and log.diffusion is None: log.collision = Diffusion.objects.station(log.station) \ .on_air().at(log.date).first() log.save() log.print() return log def trace_sound(self, source): """ Return log for current on_air (create and save it if required). """ sound_path = source.sound air_time = source.air_time # check if there is yet a log for this sound on the source delta = tz.timedelta(seconds=5) air_times = (air_time - delta, air_time + delta) log = self.log_qs.on_air().filter( source=source.id, sound__path=sound_path, date__range=air_times, ).last() if log: return log # get sound sound = Sound.objects.filter(path=sound_path) \ .select_related('diffusion').first() diff = None if sound and sound.diffusion: diff = sound.diffusion.original # check for reruns if not diff.is_date_in_range(air_time) and not diff.initial: diff = Diffusion.objects.at(air_time) \ .on_air().filter(initial=diff).first() # log sound on air return self.log( type=Log.Type.on_air, source=source.id, date=source.on_air, sound=sound, diffusion=diff, # if sound is removed, we keep sound path info comment=sound_path, ) def trace_tracks(self, log): """ Log tracks for the given sound log (for streamed programs only). """ if log.diffusion: return tracks = Track.objects.filter(sound=log.sound, timestamp__isnull=False) if not tracks.exists(): return tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk) now = tz.now() for track in tracks: pos = log.date + tz.timedelta(seconds=track.timestamp) if pos > now: break # log track on air self.log( type=Log.Type.on_air, source=log.source, date=pos, track=track, comment=track, ) def sync_playlists(self): """ Synchronize updated playlists """ now = tz.now() if self.sync_next and self.sync_next < now: return self.sync_next = now + tz.timedelta(seconds=self.sync_timeout) for source in self.station.sources: if source == self.station.dealer: continue playlist = source.program.sound_set.all() \ .filter(type=Sound.Type.archive) \ .values_list('path', flat=True) source.playlist = list(playlist) def trace_canceled(self): """ Check diffusions that should have been played but did not start, and cancel them """ if not self.cancel_timeout: return qs = Diffusions.objects.station(self.station).at().filter( type=Diffusion.Type.normal, sound__type=Sound.Type.archive, ) logs = Log.objects.station(station).on_air().with_diff() date = tz.now() - datetime.timedelta(seconds=self.cancel_timeout) for diff in qs: if logs.filter(diffusion=diff): continue if diff.start < now: diff.type = Diffusion.Type.canceled diff.save() # log canceled diffusion self.log( type=Log.Type.other, diffusion=diff, comment='Diffusion canceled after {} seconds' .format(self.cancel_timeout) ) def __current_diff(self): """ Return a tuple with the currently running diffusion and the items that still have to be played. If there is not, return None """ station = self.station now = tz.now() log = Log.objects.station(station).on_air().with_diff() \ .select_related('diffusion') \ .order_by('date').last() if not log or not log.diffusion.is_date_in_range(now): # not running anymore return None, [] # last sound source change: end of file reached or forced to stop sounds = Log.objects.station(station).on_air().with_sound() \ .filter(date__gte=log.date) \ .order_by('date') if sounds.count() and sounds.last().source != log.source: return None, [] # last diff is still playing: get remaining playlist sounds = sounds \ .filter(source=log.source, pk__gt=log.pk) \ .exclude(sound__type=Sound.Type.removed) remaining = log.diffusion.get_sounds(archive=True) \ .exclude(pk__in=sounds) \ .values_list('path', flat=True) return log.diffusion, list(remaining) def __next_diff(self, diff): """ Return the next diffusion to be played as tuple of (diff, playlist). If diff is given, it is the one to be played right after it. """ station = self.station kwargs = {'start__gte': diff.end} if diff else {} qs = Diffusion.objects.station(station) \ .on_air().at().filter(**kwargs) \ .distinct().order_by('start') diff = qs.first() return (diff, diff and diff.get_playlist(archive=True) or []) def handle_pl_sync(self, source, playlist, diff=None, date=None): """ Update playlist of a source if required, and handle logging when it is needed. - source: source on which it happens - playlist: list of sounds to use to update - diff: related diffusion """ if source.playlist == playlist: return source.playlist = playlist if diff and not diff.is_live(): # log diffusion archive load self.log(type=Log.Type.load, source=source.id, diffusion=diff, date=date, comment='\n'.join(playlist)) def handle_diff_start(self, source, diff, date): """ Enable dealer in order to play a given diffusion if required, handle start of diffusion """ if not diff or diff.start > date: return # TODO: user has not yet put the diffusion sound when diff started # => live logged; what we want: if user put a sound after it # has been logged as live, load and start this sound # live: just log it if diff.is_live(): diff_ = Log.objects.station(self.station) \ .filter(diffusion=diff, type=Log.Type.on_air) if not diff_.count(): # log live diffusion self.log(type=Log.Type.on_air, source=source.id, diffusion=diff, date=date) return # enable dealer if not source.active: source.active = True last_start = self.last_diff_start if not last_start or last_start.diffusion_id != diff.pk: # log triggered diffusion self.log(type=Log.Type.start, source=source.id, diffusion=diff, date=date) def handle(self): """ Handle scheduled diffusion, trigger if needed, preload playlists and so on. """ station = self.station dealer = station.dealer if not dealer: return now = tz.now() # current and next diffs current_diff, remaining_pl = self.__current_diff() next_diff, next_pl = self.__next_diff(current_diff) # playlist dealer.active = bool(remaining_pl) playlist = remaining_pl + next_pl self.handle_pl_sync(dealer, playlist, next_diff, now) self.handle_diff_start(dealer, next_diff, now) class Command (BaseCommand): help = __doc__ def add_arguments(self, parser): parser.formatter_class = RawTextHelpFormatter group = parser.add_argument_group('actions') group.add_argument( '-c', '--config', action='store_true', help='generate configuration files for the stations' ) group.add_argument( '-m', '--monitor', action='store_true', help='monitor the scheduled diffusions and log what happens' ) group.add_argument( '-r', '--run', action='store_true', help='run the required applications for the stations' ) group = parser.add_argument_group('options') group.add_argument( '-d', '--delay', type=int, default=1000, help='time to sleep in MILLISECONDS between two updates when we ' 'monitor' ) group.add_argument( '-s', '--station', type=str, action='append', help='name of the station to monitor instead of monitoring ' 'all stations' ) group.add_argument( '-t', '--timeout', type=int, default=600, help='time to wait in SECONDS before canceling a diffusion that ' 'has not been ran but should have been. If 0, does not ' 'check' ) def handle(self, *args, config=None, run=None, monitor=None, station=[], delay=1000, timeout=600, **options): stations = Station.objects.filter(name__in=station)[:] \ if station else Station.objects.all()[:] for station in stations: # station.prepare() if config and not run: # no need to write it twice station.streamer.push() if run: station.streamer.process_run() if monitor: monitors = [ Monitor(station, cancel_timeout=timeout) for station in stations ] delay = delay / 1000 while True: for monitor in monitors: monitor.monitor() time.sleep(delay) if run: for station in stations: station.controller.process_wait()