From 20694d8a74f371368e0647cfebd49b2c51f79f0b Mon Sep 17 00:00:00 2001 From: bkfox Date: Tue, 17 May 2016 23:21:02 +0200 Subject: [PATCH] work on monitor algorithm --- liquidsoap/management/commands/liquidsoap.py | 119 ++++++++++-------- .../templates/aircox/liquidsoap/station.liq | 10 +- liquidsoap/utils.py | 105 +++++++--------- programs/models.py | 63 +++++++--- 4 files changed, 159 insertions(+), 138 deletions(-) diff --git a/liquidsoap/management/commands/liquidsoap.py b/liquidsoap/management/commands/liquidsoap.py index fc27ffc..e6ca1b5 100644 --- a/liquidsoap/management/commands/liquidsoap.py +++ b/liquidsoap/management/commands/liquidsoap.py @@ -34,6 +34,7 @@ class Monitor: cl.run_source(controller.master) cl.run_dealer(controller) + cl.run_source(controller.dealer) for stream in controller.streams.values(): cl.run_source(stream) @@ -47,61 +48,76 @@ class Monitor: log.save() log.print() - @staticmethod - def expected_diffusion (station, date, on_air): - """ - Return which diffusion should be played now and is not playing - on the given station. - """ - r = [ programs.Diffusion.get_prev(station, date), - programs.Diffusion.get_next(station, date) ] - r = [ diffusion.prefetch_related('sounds')[0] - for diffusion in r if diffusion.count() ] + @classmethod + def __get_prev_diff(cl, source, played_sounds = True): + diff_logs = programs.Log.get_for_related_model(programs.Diffusion) \ + .filter(source = source.id) \ + .order_by('-date') + if played_sounds: + sound_logs = programs.Log.get_for_related_model(programs.Sound) \ + .filter(source = source.id) \ + .order_by('-date') + if not diff_logs: + return - for diffusion in r: - if diffusion.end < date: - continue - - diffusion.playlist = [ sound.path - for sound in diffusion.get_archives() ] - diffusion.playlist.save() - if diffusion.playlist and on_air not in diffusion.playlist: - return diffusion + diff = diff_logs[0].related_object + playlist = diff.playlist + if played_sounds: + diff.played = [ sound.related_object.path + for sound in sound_logs[0:len(playlist)] ] + return diff @classmethod - def run_dealer (cl, controller): - """ - Monitor dealer playlist (if it is time to load) and whether it is time - to trigger the button to start a diffusion. - """ + def run_dealer(cl, controller): + # - this function must recover last state in case of crash + # -> don't store data out of hdd + # - construct gradually the playlist and update it if needed + # -> we force liquidsoap to preload tracks of next diff + # - dealer.on while last logged diff is playing, otherwise off + # - when next diff is now and last diff no more active, play it + # -> log and dealer.on dealer = controller.dealer - playlist = dealer.playlist - on_air = dealer.current_sound now = tz.make_aware(tz.datetime.now()) + playlist = [] - diff = cl.expected_diffusion(controller.station, now, on_air) - if not diff: - return # there is nothing we can do + # - the last logged diff is the last one played, it can be playing + # -> no sound left or the diff is not more current: dealer.off + # -> otherwise, ensure dealer.on + # - played sounds are logged in run_source + prev_diff = cl.__get_prev_diff(dealer) + if prev_diff and prev_diff.is_date_in_my_range(now): + playlist = [ path for path in prev_diff.playlist + if path not in prev_diff.played ] + dealer.on = bool(playlist) + else: + playlist = [] + dealer.on = False - # playlist reload - if dealer.playlist != diff.playlist: - if not playlist or on_air == playlist[-1] or \ - on_air not in playlist: - dealer.on = False - dealer.playlist = diff.playlist - dealer.playlist.save() + # - preload next diffusion's tracks + args = {'start__gt': prev_diff.start } if prev_diff else {} + next_diff = programs.Diffusion \ + .get(controller.station, now, now = True, + sounds__isnull = False, **args) \ + .prefetch_related('sounds') + if next_diff: + next_diff = next_diff[0] + playlist += next_diff.playlist - # run the diff - if dealer.playlist == diff.playlist and diff.start <= now and not dealer.on: + # playlist update + if dealer.playlist != playlist: + dealer.playlist = playlist + + # dealer.on when next_diff.start <= now + if next_diff and not dealer.on and next_diff.start <= now: dealer.on = True for source in controller.streams.values(): source.skip() cl.log( source = dealer.id, date = now, - comment = 'trigger the scheduled diffusion to liquidsoap; ' - 'skip all other streams', - related_object = diff, + comment = 'trigger diffusion to liquidsoap; ' + 'skip other streams', + related_object = next_diff, ) @classmethod @@ -124,9 +140,9 @@ class Monitor: last_log = last_log[0] last_obj = last_log.related_object if type(last_obj) == programs.Sound and on_air == last_obj.path: - if not last_obj.duration or \ - now < log.date + programs_utils.to_timedelta(last_obj.duration): - return + #if not last_obj.duration or \ + # now < last_log.date + to_timedelta(last_obj.duration): + return sound = programs.Sound.objects.filter(path = on_air) if not sound: @@ -136,7 +152,7 @@ class Monitor: cl.log( source = source.id, date = tz.make_aware(tz.datetime.now()), - comment = 'sound has changed', + comment = 'sound changed', related_object = sound or None, ) @@ -198,7 +214,6 @@ class Command (BaseCommand): run = options.get('run') monitor = options.get('on_air') or options.get('monitor') - self.controllers = [ utils.Controller(station, connector = monitor) for station in stations ] @@ -217,7 +232,7 @@ class Command (BaseCommand): def handle_write (self): for controller in self.controllers: - controller.write_data() + controller.write() def handle_run (self): for controller in self.controllers: @@ -227,22 +242,18 @@ class Command (BaseCommand): atexit.register(controller.process.terminate) def handle_monitor (self, options): - controllers = [ - utils.Controller(station) - for station in programs.Station.objects.filter(active = True) - ] - for controller in controllers: + for controller in self.controllers: controller.update() if options.get('on_air'): - for controller in controllers: + for controller in self.controllers: print(controller.id, controller.on_air) return if options.get('monitor'): delay = options.get('delay') / 1000 while True: - for controller in controllers: + for controller in self.controllers: #try: Monitor.run(controller) #except Exception as err: diff --git a/liquidsoap/templates/aircox/liquidsoap/station.liq b/liquidsoap/templates/aircox/liquidsoap/station.liq index 011c4e0..40f118a 100644 --- a/liquidsoap/templates/aircox/liquidsoap/station.liq +++ b/liquidsoap/templates/aircox/liquidsoap/station.liq @@ -31,7 +31,7 @@ set("{{ key|safe }}", {{ value|safe }}) \ at(interactive.bool('{{ source.id }}_on', false), \ interactive_source('{{ source.id }}', playlist.once( \ reload_mode='watch', \ - "{{ source.playlist.path }}", \ + "{{ source.path }}", \ )) \ ), \ {% endif %} @@ -41,21 +41,17 @@ set("{{ key|safe }}", {{ value|safe }}) \ interactive_source("{{ controller.id }}_streams", rotate([ \ {% for source in controller.streams.values %} {% with info=source.stream_info %} - {% with path=source.playlist.path %} {% if info.delay %} - delay({{ info.delay }}., stream("{{ source.id }}", "{{ path }}")), \ + delay({{ info.delay }}., stream("{{ source.id }}", "{{ source.path }}")), \ {% elif info.begin and info.end %} - at({ {{info.begin}}-{{info.end}} }, stream("{{ source.id }}", "{{ path }}")), \ + at({ {{info.begin}}-{{info.end}} }, stream("{{ source.id }}", "{{ source.path }}")), \ {% endif %} {% endwith %} - {% endwith %} {% endfor %} {% for source in controller.streams.values %} {% if not source.stream_info %} - {% with path=source.playlist.path %} stream("{{ source.id }}", "{{ source.path }}"), \ - {% endwith %} {% endif %} {% endfor %} ])), \ diff --git a/liquidsoap/utils.py b/liquidsoap/utils.py index 70a5e7e..447693d 100644 --- a/liquidsoap/utils.py +++ b/liquidsoap/utils.py @@ -45,7 +45,6 @@ class Connector: self.__socket.connect(self.address) self.__available = True except: - # print('can not connect to liquidsoap socket {}'.format(self.address)) self.__available = False return -1 @@ -94,57 +93,6 @@ class Connector: except: return None - -class Playlist(list): - path = None - - def __init__(self, path = None, items = None, program = None): - self.path = path - self.program = program - if program: - self.load_from_db() - elif path: - self.load() - elif items: - self.extend(items) - - def save(self): - """ - Save data to the playlist file - """ - os.makedirs(os.path.dirname(self.path), exist_ok = True) - with open(self.path, 'w') as file: - file.write('\n'.join(self)) - - def load(self): - """ - Load data from playlist file - """ - if not os.path.exists(self.path): - return - with open(self.path, 'r') as file: - self.clear() - self.extend(file.readlines()) - - def load_from_db(self, clear = True): - """ - Update content from the database using the given program - If clear is True, clear older items, otherwise append to the - current playlist. - If save is True, save the playlist to the playlist file - """ - sounds = programs.Sound.objects.filter( - type = programs.Sound.Type['archive'], - path__startswith = os.path.join( - programs_settings.AIRCOX_SOUND_ARCHIVES_SUBDIR, - self.program.path - ), - # good_quality = True - removed = False - ) - self.clear() - self.extend([sound.path for sound in sounds]) - class BaseSource: id = None name = None @@ -157,7 +105,7 @@ class BaseSource: self.controller = controller def _send(self, *args, **kwargs): - self.controller.connector.send(*args, **kwargs) + return self.controller.connector.send(*args, **kwargs) @property def current_sound(self): @@ -191,7 +139,7 @@ class BaseSource: class Source(BaseSource): - playlist = None # playlist file + __playlist = None # playlist file program = None # related program (if given) is_dealer = False # Source is a dealer metadata = None @@ -208,10 +156,12 @@ class Source(BaseSource): super().__init__(controller, id, name) - path = os.path.join(settings.AIRCOX_LIQUIDSOAP_MEDIA, - station.slug, - self.id + '.m3u') - self.playlist = Playlist(path, program = program) + self.program = program + self.path = os.path.join(settings.AIRCOX_LIQUIDSOAP_MEDIA, + station.slug, + self.id + '.m3u') + if program: + self.playlist_from_db() @property def on(self): @@ -230,6 +180,38 @@ class Source(BaseSource): return self._send('var.set ', self.id, '_on', '=', 'true' if value else 'false') + @property + def playlist(self): + return self.__playlist + + @playlist.setter + def playlist(self, value): + self.__playlist = value + self.write() + + def write(self): + """ + Write stream's data (playlist) + """ + os.makedirs(os.path.dirname(self.path), exist_ok = True) + with open(self.path, 'w') as file: + file.write('\n'.join(self.playlist or [])) + + def playlist_from_db(self): + """ + Update content from the database using the source's program + """ + sounds = programs.Sound.objects.filter( + type = programs.Sound.Type['archive'], + path__startswith = os.path.join( + programs_settings.AIRCOX_SOUND_ARCHIVES_SUBDIR, + self.program.path + ), + # good_quality = True + removed = False + ) + self.playlist = [sound.path for sound in sounds] + def stream_info(self): """ Return a dict with info related to the program's stream. @@ -356,15 +338,14 @@ class Controller: for source in self.streams.values(): source.update() - def write_data(self, playlist = True, config = True): + def write(self, playlist = True, config = True): """ Write stream's playlists, and config """ - os.makedirs(self.path, exist_ok = True) if playlist: for source in self.streams.values(): - source.playlist.save() - self.dealer.playlist.save() + source.write() + self.dealer.write() if not config: return diff --git a/programs/models.py b/programs/models.py index b4eaee1..2520944 100755 --- a/programs/models.py +++ b/programs/models.py @@ -115,6 +115,7 @@ class Sound (Nameable): .replace('.', r'\.') + ')$', recursive = True, blank = True, null = True, + max_length = 256 ) embed = models.TextField( _('embed HTML code'), @@ -594,6 +595,15 @@ class Diffusion (models.Model): def date (self): return self.start + @property + def playlist(self): + """ + List of sounds as playlist + """ + playlist = [ sound.path for sound in self.sounds.all() ] + playlist.sort() + return playlist + def archives_duration (self): """ Get total duration of the archives. May differ from the schedule @@ -615,26 +625,49 @@ class Diffusion (models.Model): return r @classmethod - def get_next (cl, station = None, date = None, **filter_args): + def get (cl, station = None, date = None, + now = False, next = False, prev = False, + **filter_args): """ - Return a queryset with the upcoming diffusions, ordered by - +date - """ - filter_args['start__gte'] = date_or_default(date) - if station: - filter_args['program__station'] = station - return cl.objects.filter(**filter_args).order_by('start') + Return a queryset of diffusions, depending on value of now/next/prev + - now: that have date in their start-end range or start after + - next: that start after date + - prev: that end before date - @classmethod - def get_prev (cl, station = None, date = None, **filter_args): + Diffusions are ordered by +start for now and next; -start for prev """ - Return a queryset with the previous diffusion, ordered by - -date - """ - filter_args['start__lte'] = date_or_default(date) + #FIXME: conflicts? ( + calling functions) + date = date_or_default(date) if station: filter_args['program__station'] = station - return cl.objects.filter(**filter_args).order_by('-start') + + if now: + return cl.objects.filter( + models.Q(start__lte = date, + end__gte = date) | + models.Q(start__gte = date), + **filter_args + ).order_by('start') + + if next: + return cl.objects.filter( + start__gte = date, + **filter_args + ).order_by('start') + + if prev: + return cl.objects.filter( + end__lte = date, + **filter_args + ).order_by('-start') + + + def is_date_in_my_range(self, date): + """ + Return true if the given date is in the diffusion's start-end + range. + """ + return self.start < date_or_default(date) < self.end def get_conflicts (self): """