work on monitor algorithm

This commit is contained in:
bkfox 2016-05-17 23:21:02 +02:00
parent 29d0929a0c
commit 20694d8a74
4 changed files with 159 additions and 138 deletions

View File

@ -34,6 +34,7 @@ class Monitor:
cl.run_source(controller.master) cl.run_source(controller.master)
cl.run_dealer(controller) cl.run_dealer(controller)
cl.run_source(controller.dealer)
for stream in controller.streams.values(): for stream in controller.streams.values():
cl.run_source(stream) cl.run_source(stream)
@ -47,61 +48,76 @@ class Monitor:
log.save() log.save()
log.print() log.print()
@staticmethod @classmethod
def expected_diffusion (station, date, on_air): def __get_prev_diff(cl, source, played_sounds = True):
""" diff_logs = programs.Log.get_for_related_model(programs.Diffusion) \
Return which diffusion should be played now and is not playing .filter(source = source.id) \
on the given station. .order_by('-date')
""" if played_sounds:
r = [ programs.Diffusion.get_prev(station, date), sound_logs = programs.Log.get_for_related_model(programs.Sound) \
programs.Diffusion.get_next(station, date) ] .filter(source = source.id) \
r = [ diffusion.prefetch_related('sounds')[0] .order_by('-date')
for diffusion in r if diffusion.count() ] if not diff_logs:
return
for diffusion in r: diff = diff_logs[0].related_object
if diffusion.end < date: playlist = diff.playlist
continue if played_sounds:
diff.played = [ sound.related_object.path
diffusion.playlist = [ sound.path for sound in sound_logs[0:len(playlist)] ]
for sound in diffusion.get_archives() ] return diff
diffusion.playlist.save()
if diffusion.playlist and on_air not in diffusion.playlist:
return diffusion
@classmethod @classmethod
def run_dealer (cl, controller): def run_dealer(cl, controller):
""" # - this function must recover last state in case of crash
Monitor dealer playlist (if it is time to load) and whether it is time # -> don't store data out of hdd
to trigger the button to start a diffusion. # - construct gradually the playlist and update it if needed
""" # -> we force liquidsoap to preload tracks of next diff
# - dealer.on while last logged diff is playing, otherwise off
# - when next diff is now and last diff no more active, play it
# -> log and dealer.on
dealer = controller.dealer dealer = controller.dealer
playlist = dealer.playlist
on_air = dealer.current_sound
now = tz.make_aware(tz.datetime.now()) now = tz.make_aware(tz.datetime.now())
playlist = []
diff = cl.expected_diffusion(controller.station, now, on_air) # - the last logged diff is the last one played, it can be playing
if not diff: # -> no sound left or the diff is not more current: dealer.off
return # there is nothing we can do # -> otherwise, ensure dealer.on
# - played sounds are logged in run_source
prev_diff = cl.__get_prev_diff(dealer)
if prev_diff and prev_diff.is_date_in_my_range(now):
playlist = [ path for path in prev_diff.playlist
if path not in prev_diff.played ]
dealer.on = bool(playlist)
else:
playlist = []
dealer.on = False
# playlist reload # - preload next diffusion's tracks
if dealer.playlist != diff.playlist: args = {'start__gt': prev_diff.start } if prev_diff else {}
if not playlist or on_air == playlist[-1] or \ next_diff = programs.Diffusion \
on_air not in playlist: .get(controller.station, now, now = True,
dealer.on = False sounds__isnull = False, **args) \
dealer.playlist = diff.playlist .prefetch_related('sounds')
dealer.playlist.save() if next_diff:
next_diff = next_diff[0]
playlist += next_diff.playlist
# run the diff # playlist update
if dealer.playlist == diff.playlist and diff.start <= now and not dealer.on: if dealer.playlist != playlist:
dealer.playlist = playlist
# dealer.on when next_diff.start <= now
if next_diff and not dealer.on and next_diff.start <= now:
dealer.on = True dealer.on = True
for source in controller.streams.values(): for source in controller.streams.values():
source.skip() source.skip()
cl.log( cl.log(
source = dealer.id, source = dealer.id,
date = now, date = now,
comment = 'trigger the scheduled diffusion to liquidsoap; ' comment = 'trigger diffusion to liquidsoap; '
'skip all other streams', 'skip other streams',
related_object = diff, related_object = next_diff,
) )
@classmethod @classmethod
@ -124,9 +140,9 @@ class Monitor:
last_log = last_log[0] last_log = last_log[0]
last_obj = last_log.related_object last_obj = last_log.related_object
if type(last_obj) == programs.Sound and on_air == last_obj.path: if type(last_obj) == programs.Sound and on_air == last_obj.path:
if not last_obj.duration or \ #if not last_obj.duration or \
now < log.date + programs_utils.to_timedelta(last_obj.duration): # now < last_log.date + to_timedelta(last_obj.duration):
return return
sound = programs.Sound.objects.filter(path = on_air) sound = programs.Sound.objects.filter(path = on_air)
if not sound: if not sound:
@ -136,7 +152,7 @@ class Monitor:
cl.log( cl.log(
source = source.id, source = source.id,
date = tz.make_aware(tz.datetime.now()), date = tz.make_aware(tz.datetime.now()),
comment = 'sound has changed', comment = 'sound changed',
related_object = sound or None, related_object = sound or None,
) )
@ -198,7 +214,6 @@ class Command (BaseCommand):
run = options.get('run') run = options.get('run')
monitor = options.get('on_air') or options.get('monitor') monitor = options.get('on_air') or options.get('monitor')
self.controllers = [ utils.Controller(station, connector = monitor) self.controllers = [ utils.Controller(station, connector = monitor)
for station in stations ] for station in stations ]
@ -217,7 +232,7 @@ class Command (BaseCommand):
def handle_write (self): def handle_write (self):
for controller in self.controllers: for controller in self.controllers:
controller.write_data() controller.write()
def handle_run (self): def handle_run (self):
for controller in self.controllers: for controller in self.controllers:
@ -227,22 +242,18 @@ class Command (BaseCommand):
atexit.register(controller.process.terminate) atexit.register(controller.process.terminate)
def handle_monitor (self, options): def handle_monitor (self, options):
controllers = [ for controller in self.controllers:
utils.Controller(station)
for station in programs.Station.objects.filter(active = True)
]
for controller in controllers:
controller.update() controller.update()
if options.get('on_air'): if options.get('on_air'):
for controller in controllers: for controller in self.controllers:
print(controller.id, controller.on_air) print(controller.id, controller.on_air)
return return
if options.get('monitor'): if options.get('monitor'):
delay = options.get('delay') / 1000 delay = options.get('delay') / 1000
while True: while True:
for controller in controllers: for controller in self.controllers:
#try: #try:
Monitor.run(controller) Monitor.run(controller)
#except Exception as err: #except Exception as err:

View File

@ -31,7 +31,7 @@ set("{{ key|safe }}", {{ value|safe }}) \
at(interactive.bool('{{ source.id }}_on', false), \ at(interactive.bool('{{ source.id }}_on', false), \
interactive_source('{{ source.id }}', playlist.once( \ interactive_source('{{ source.id }}', playlist.once( \
reload_mode='watch', \ reload_mode='watch', \
"{{ source.playlist.path }}", \ "{{ source.path }}", \
)) \ )) \
), \ ), \
{% endif %} {% endif %}
@ -41,21 +41,17 @@ set("{{ key|safe }}", {{ value|safe }}) \
interactive_source("{{ controller.id }}_streams", rotate([ \ interactive_source("{{ controller.id }}_streams", rotate([ \
{% for source in controller.streams.values %} {% for source in controller.streams.values %}
{% with info=source.stream_info %} {% with info=source.stream_info %}
{% with path=source.playlist.path %}
{% if info.delay %} {% if info.delay %}
delay({{ info.delay }}., stream("{{ source.id }}", "{{ path }}")), \ delay({{ info.delay }}., stream("{{ source.id }}", "{{ source.path }}")), \
{% elif info.begin and info.end %} {% elif info.begin and info.end %}
at({ {{info.begin}}-{{info.end}} }, stream("{{ source.id }}", "{{ path }}")), \ at({ {{info.begin}}-{{info.end}} }, stream("{{ source.id }}", "{{ source.path }}")), \
{% endif %} {% endif %}
{% endwith %} {% endwith %}
{% endwith %}
{% endfor %} {% endfor %}
{% for source in controller.streams.values %} {% for source in controller.streams.values %}
{% if not source.stream_info %} {% if not source.stream_info %}
{% with path=source.playlist.path %}
stream("{{ source.id }}", "{{ source.path }}"), \ stream("{{ source.id }}", "{{ source.path }}"), \
{% endwith %}
{% endif %} {% endif %}
{% endfor %} {% endfor %}
])), \ ])), \

View File

@ -45,7 +45,6 @@ class Connector:
self.__socket.connect(self.address) self.__socket.connect(self.address)
self.__available = True self.__available = True
except: except:
# print('can not connect to liquidsoap socket {}'.format(self.address))
self.__available = False self.__available = False
return -1 return -1
@ -94,57 +93,6 @@ class Connector:
except: except:
return None return None
class Playlist(list):
path = None
def __init__(self, path = None, items = None, program = None):
self.path = path
self.program = program
if program:
self.load_from_db()
elif path:
self.load()
elif items:
self.extend(items)
def save(self):
"""
Save data to the playlist file
"""
os.makedirs(os.path.dirname(self.path), exist_ok = True)
with open(self.path, 'w') as file:
file.write('\n'.join(self))
def load(self):
"""
Load data from playlist file
"""
if not os.path.exists(self.path):
return
with open(self.path, 'r') as file:
self.clear()
self.extend(file.readlines())
def load_from_db(self, clear = True):
"""
Update content from the database using the given program
If clear is True, clear older items, otherwise append to the
current playlist.
If save is True, save the playlist to the playlist file
"""
sounds = programs.Sound.objects.filter(
type = programs.Sound.Type['archive'],
path__startswith = os.path.join(
programs_settings.AIRCOX_SOUND_ARCHIVES_SUBDIR,
self.program.path
),
# good_quality = True
removed = False
)
self.clear()
self.extend([sound.path for sound in sounds])
class BaseSource: class BaseSource:
id = None id = None
name = None name = None
@ -157,7 +105,7 @@ class BaseSource:
self.controller = controller self.controller = controller
def _send(self, *args, **kwargs): def _send(self, *args, **kwargs):
self.controller.connector.send(*args, **kwargs) return self.controller.connector.send(*args, **kwargs)
@property @property
def current_sound(self): def current_sound(self):
@ -191,7 +139,7 @@ class BaseSource:
class Source(BaseSource): class Source(BaseSource):
playlist = None # playlist file __playlist = None # playlist file
program = None # related program (if given) program = None # related program (if given)
is_dealer = False # Source is a dealer is_dealer = False # Source is a dealer
metadata = None metadata = None
@ -208,10 +156,12 @@ class Source(BaseSource):
super().__init__(controller, id, name) super().__init__(controller, id, name)
path = os.path.join(settings.AIRCOX_LIQUIDSOAP_MEDIA, self.program = program
station.slug, self.path = os.path.join(settings.AIRCOX_LIQUIDSOAP_MEDIA,
self.id + '.m3u') station.slug,
self.playlist = Playlist(path, program = program) self.id + '.m3u')
if program:
self.playlist_from_db()
@property @property
def on(self): def on(self):
@ -230,6 +180,38 @@ class Source(BaseSource):
return self._send('var.set ', self.id, '_on', '=', return self._send('var.set ', self.id, '_on', '=',
'true' if value else 'false') 'true' if value else 'false')
@property
def playlist(self):
return self.__playlist
@playlist.setter
def playlist(self, value):
self.__playlist = value
self.write()
def write(self):
"""
Write stream's data (playlist)
"""
os.makedirs(os.path.dirname(self.path), exist_ok = True)
with open(self.path, 'w') as file:
file.write('\n'.join(self.playlist or []))
def playlist_from_db(self):
"""
Update content from the database using the source's program
"""
sounds = programs.Sound.objects.filter(
type = programs.Sound.Type['archive'],
path__startswith = os.path.join(
programs_settings.AIRCOX_SOUND_ARCHIVES_SUBDIR,
self.program.path
),
# good_quality = True
removed = False
)
self.playlist = [sound.path for sound in sounds]
def stream_info(self): def stream_info(self):
""" """
Return a dict with info related to the program's stream. Return a dict with info related to the program's stream.
@ -356,15 +338,14 @@ class Controller:
for source in self.streams.values(): for source in self.streams.values():
source.update() source.update()
def write_data(self, playlist = True, config = True): def write(self, playlist = True, config = True):
""" """
Write stream's playlists, and config Write stream's playlists, and config
""" """
os.makedirs(self.path, exist_ok = True)
if playlist: if playlist:
for source in self.streams.values(): for source in self.streams.values():
source.playlist.save() source.write()
self.dealer.playlist.save() self.dealer.write()
if not config: if not config:
return return

View File

@ -115,6 +115,7 @@ class Sound (Nameable):
.replace('.', r'\.') + ')$', .replace('.', r'\.') + ')$',
recursive = True, recursive = True,
blank = True, null = True, blank = True, null = True,
max_length = 256
) )
embed = models.TextField( embed = models.TextField(
_('embed HTML code'), _('embed HTML code'),
@ -594,6 +595,15 @@ class Diffusion (models.Model):
def date (self): def date (self):
return self.start return self.start
@property
def playlist(self):
"""
List of sounds as playlist
"""
playlist = [ sound.path for sound in self.sounds.all() ]
playlist.sort()
return playlist
def archives_duration (self): def archives_duration (self):
""" """
Get total duration of the archives. May differ from the schedule Get total duration of the archives. May differ from the schedule
@ -615,26 +625,49 @@ class Diffusion (models.Model):
return r return r
@classmethod @classmethod
def get_next (cl, station = None, date = None, **filter_args): def get (cl, station = None, date = None,
now = False, next = False, prev = False,
**filter_args):
""" """
Return a queryset with the upcoming diffusions, ordered by Return a queryset of diffusions, depending on value of now/next/prev
+date - now: that have date in their start-end range or start after
""" - next: that start after date
filter_args['start__gte'] = date_or_default(date) - prev: that end before date
if station:
filter_args['program__station'] = station
return cl.objects.filter(**filter_args).order_by('start')
@classmethod Diffusions are ordered by +start for now and next; -start for prev
def get_prev (cl, station = None, date = None, **filter_args):
""" """
Return a queryset with the previous diffusion, ordered by #FIXME: conflicts? ( + calling functions)
-date date = date_or_default(date)
"""
filter_args['start__lte'] = date_or_default(date)
if station: if station:
filter_args['program__station'] = station filter_args['program__station'] = station
return cl.objects.filter(**filter_args).order_by('-start')
if now:
return cl.objects.filter(
models.Q(start__lte = date,
end__gte = date) |
models.Q(start__gte = date),
**filter_args
).order_by('start')
if next:
return cl.objects.filter(
start__gte = date,
**filter_args
).order_by('start')
if prev:
return cl.objects.filter(
end__lte = date,
**filter_args
).order_by('-start')
def is_date_in_my_range(self, date):
"""
Return true if the given date is in the diffusion's start-end
range.
"""
return self.start < date_or_default(date) < self.end
def get_conflicts (self): def get_conflicts (self):
""" """