forked from rc/aircox
rewrite streamer and controller -- much cleaner and efficient; continue to work on new architecture
This commit is contained in:
@ -6,23 +6,18 @@ used to:
|
||||
- cancels Diffusions that have an archive but could not have been played;
|
||||
- run Liquidsoap
|
||||
"""
|
||||
import tzlocal
|
||||
import time
|
||||
import re
|
||||
|
||||
from argparse import RawTextHelpFormatter
|
||||
import time
|
||||
|
||||
from django.conf import settings as main_settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
import pytz
|
||||
import tzlocal
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.utils import timezone as tz
|
||||
from django.utils.functional import cached_property
|
||||
from django.db import models
|
||||
from django.db.models import Q
|
||||
|
||||
from aircox.models import Station, Diffusion, Track, Sound, Log
|
||||
from aircox.models import Station, Episode, Diffusion, Track, Sound, Log
|
||||
from aircox.controllers import Streamer, PlaylistSource
|
||||
|
||||
# force using UTC
|
||||
import pytz
|
||||
tz.activate(pytz.UTC)
|
||||
|
||||
|
||||
@ -45,125 +40,91 @@ class Monitor:
|
||||
- scheduled diffusions
|
||||
- tracks for sounds of streamed programs
|
||||
"""
|
||||
station = None
|
||||
streamer = None
|
||||
cancel_timeout = 60*10
|
||||
"""
|
||||
Time in seconds before a diffusion that have archives is cancelled
|
||||
because it has not been played.
|
||||
"""
|
||||
sync_timeout = 60*10
|
||||
"""
|
||||
Time in minuts before all stream playlists are checked and updated
|
||||
"""
|
||||
""" Streamer controller """
|
||||
logs = None
|
||||
""" Queryset to station's logs (ordered by -pk) """
|
||||
cancel_timeout = 20
|
||||
""" Timeout in minutes before cancelling a diffusion. """
|
||||
sync_timeout = 5
|
||||
""" Timeout in minutes between two streamer's sync. """
|
||||
sync_next = None
|
||||
"""
|
||||
Datetime of the next sync
|
||||
"""
|
||||
|
||||
def get_last_log(self, *args, **kwargs):
|
||||
return self.log_qs.filter(*args, **kwargs).last()
|
||||
""" Datetime of the next sync """
|
||||
|
||||
@property
|
||||
def log_qs(self):
|
||||
return Log.objects.station(self.station) \
|
||||
.select_related('diffusion', 'sound') \
|
||||
.order_by('pk')
|
||||
def station(self):
|
||||
return self.streamer.station
|
||||
|
||||
@property
|
||||
def last_log(self):
|
||||
"""
|
||||
Last log of monitored station
|
||||
"""
|
||||
return self.log_qs.last()
|
||||
|
||||
@property
|
||||
def last_sound(self):
|
||||
"""
|
||||
Last sound log of monitored station that occurred on_air
|
||||
"""
|
||||
return self.get_last_log(type=Log.Type.on_air, sound__isnull=False)
|
||||
""" Last log of monitored station. """
|
||||
return self.logs.first()
|
||||
|
||||
@property
|
||||
def last_diff_start(self):
|
||||
"""
|
||||
Log of last triggered item (sound or diffusion)
|
||||
"""
|
||||
return self.get_last_log(type=Log.Type.start, diffusion__isnull=False)
|
||||
""" Log of last triggered item (sound or diffusion). """
|
||||
return self.logs.start().with_diff().first()
|
||||
|
||||
def __init__(self, station, **kwargs):
|
||||
self.station = station
|
||||
def __init__(self, streamer, **kwargs):
|
||||
self.streamer = streamer
|
||||
self.__dict__.update(kwargs)
|
||||
self.logs = self.get_logs_queryset()
|
||||
|
||||
def get_logs_queryset(self):
|
||||
""" Return queryset to assign as `self.logs` """
|
||||
return self.station.log_set.select_related('diffusion', 'sound') \
|
||||
.order_by('-pk')
|
||||
|
||||
def monitor(self):
|
||||
"""
|
||||
Run all monitoring functions.
|
||||
"""
|
||||
if not self.streamer:
|
||||
self.streamer = self.station.streamer
|
||||
|
||||
if not self.streamer.ready():
|
||||
""" Run all monitoring functions once. """
|
||||
if not self.streamer.is_ready:
|
||||
return
|
||||
|
||||
self.streamer.fetch()
|
||||
source = self.streamer.source
|
||||
if source and source.sound:
|
||||
if source and source.uri:
|
||||
log = self.trace_sound(source)
|
||||
if log:
|
||||
self.trace_tracks(log)
|
||||
else:
|
||||
print('no source or sound for stream; source = ', source)
|
||||
|
||||
self.sync_playlists()
|
||||
self.handle()
|
||||
self.handle_diffusions()
|
||||
self.sync()
|
||||
|
||||
def log(self, date=None, **kwargs):
|
||||
""" Create a log using **kwargs, and print info """
|
||||
log = Log(station=self.station, date=date or tz.now(), **kwargs)
|
||||
if log.type == Log.Type.on_air and log.diffusion is None:
|
||||
log.collision = Diffusion.objects.station(log.station) \
|
||||
.on_air().at(log.date).first()
|
||||
|
||||
kwargs.setdefault('station', self.station)
|
||||
log = Log(date=date or tz.now(), **kwargs)
|
||||
log.save()
|
||||
log.print()
|
||||
return log
|
||||
|
||||
def trace_sound(self, source):
|
||||
"""
|
||||
Return log for current on_air (create and save it if required).
|
||||
"""
|
||||
sound_path = source.sound
|
||||
air_time = source.air_time
|
||||
""" Return on air sound log (create if not present). """
|
||||
sound_path, air_time = source.uri, source.air_time
|
||||
|
||||
# check if there is yet a log for this sound on the source
|
||||
delta = tz.timedelta(seconds=5)
|
||||
air_times = (air_time - delta, air_time + delta)
|
||||
|
||||
log = self.log_qs.on_air().filter(
|
||||
source=source.id, sound__path=sound_path,
|
||||
date__range=air_times,
|
||||
).last()
|
||||
log = self.logs.on_air().filter(source=source.id,
|
||||
sound__path=sound_path,
|
||||
date__range=air_times).first()
|
||||
if log:
|
||||
return log
|
||||
|
||||
# get sound
|
||||
sound = Sound.objects.filter(path=sound_path) \
|
||||
.select_related('diffusion').first()
|
||||
diff = None
|
||||
if sound and sound.diffusion:
|
||||
diff = sound.diffusion.original
|
||||
# check for reruns
|
||||
if not diff.is_date_in_range(air_time) and not diff.initial:
|
||||
diff = Diffusion.objects.at(air_time) \
|
||||
.on_air().filter(initial=diff).first()
|
||||
sound = Sound.objects.filter(path=sound_path).first()
|
||||
if sound and sound.episode_id is not None:
|
||||
diff = Diffusion.objects.episode(id=sound.episode_id).on_air() \
|
||||
.now(air_time).first()
|
||||
|
||||
# log sound on air
|
||||
return self.log(
|
||||
type=Log.Type.on_air, source=source.id, date=source.on_air,
|
||||
sound=sound, diffusion=diff,
|
||||
# if sound is removed, we keep sound path info
|
||||
comment=sound_path,
|
||||
)
|
||||
return self.log(type=Log.Type.on_air, date=source.air_time,
|
||||
source=source.id, sound=sound, diffusion=diff,
|
||||
comment=sound_path)
|
||||
|
||||
def trace_tracks(self, log):
|
||||
"""
|
||||
@ -172,10 +133,13 @@ class Monitor:
|
||||
if log.diffusion:
|
||||
return
|
||||
|
||||
tracks = Track.objects.filter(sound=log.sound, timestamp__isnull=False)
|
||||
tracks = Track.objects \
|
||||
.filter(sound__id=log.sound_id, timestamp__isnull=False)\
|
||||
.order_by('timestamp')
|
||||
if not tracks.exists():
|
||||
return
|
||||
|
||||
# exclude already logged tracks
|
||||
tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk)
|
||||
now = tz.now()
|
||||
for track in tracks:
|
||||
@ -183,178 +147,40 @@ class Monitor:
|
||||
if pos > now:
|
||||
break
|
||||
# log track on air
|
||||
self.log(
|
||||
type=Log.Type.on_air, source=log.source,
|
||||
date=pos, track=track,
|
||||
comment=track,
|
||||
)
|
||||
self.log(type=Log.Type.on_air, date=pos, source=log.source,
|
||||
track=track, comment=track)
|
||||
|
||||
def sync_playlists(self):
|
||||
"""
|
||||
Synchronize updated playlists
|
||||
"""
|
||||
now = tz.now()
|
||||
if self.sync_next and self.sync_next < now:
|
||||
return
|
||||
|
||||
self.sync_next = now + tz.timedelta(seconds=self.sync_timeout)
|
||||
|
||||
for source in self.station.sources:
|
||||
if source == self.station.dealer:
|
||||
continue
|
||||
playlist = source.program.sound_set.all() \
|
||||
.filter(type=Sound.Type.archive) \
|
||||
.values_list('path', flat=True)
|
||||
source.playlist = list(playlist)
|
||||
|
||||
def trace_canceled(self):
|
||||
"""
|
||||
Check diffusions that should have been played but did not start,
|
||||
and cancel them
|
||||
"""
|
||||
if not self.cancel_timeout:
|
||||
return
|
||||
|
||||
qs = Diffusions.objects.station(self.station).at().filter(
|
||||
type=Diffusion.Type.on_air,
|
||||
sound__type=Sound.Type.archive,
|
||||
)
|
||||
logs = Log.objects.station(station).on_air().with_diff()
|
||||
|
||||
date = tz.now() - datetime.timedelta(seconds=self.cancel_timeout)
|
||||
for diff in qs:
|
||||
if logs.filter(diffusion=diff):
|
||||
continue
|
||||
if diff.start < now:
|
||||
diff.type = Diffusion.Type.canceled
|
||||
diff.save()
|
||||
# log canceled diffusion
|
||||
self.log(
|
||||
type=Log.Type.other,
|
||||
diffusion=diff,
|
||||
comment='Diffusion canceled after {} seconds'
|
||||
.format(self.cancel_timeout)
|
||||
)
|
||||
|
||||
def __current_diff(self):
|
||||
"""
|
||||
Return a tuple with the currently running diffusion and the items
|
||||
that still have to be played. If there is not, return None
|
||||
"""
|
||||
station = self.station
|
||||
now = tz.now()
|
||||
|
||||
log = Log.objects.station(station).on_air().with_diff() \
|
||||
.select_related('diffusion') \
|
||||
.order_by('date').last()
|
||||
if not log or not log.diffusion.is_date_in_range(now):
|
||||
# not running anymore
|
||||
return None, []
|
||||
|
||||
# last sound source change: end of file reached or forced to stop
|
||||
sounds = Log.objects.station(station).on_air().with_sound() \
|
||||
.filter(date__gte=log.date) \
|
||||
.order_by('date')
|
||||
|
||||
if sounds.count() and sounds.last().source != log.source:
|
||||
return None, []
|
||||
|
||||
# last diff is still playing: get remaining playlist
|
||||
sounds = sounds \
|
||||
.filter(source=log.source, pk__gt=log.pk) \
|
||||
.exclude(sound__type=Sound.Type.removed)
|
||||
|
||||
remaining = log.diffusion.get_sounds(archive=True) \
|
||||
.exclude(pk__in=sounds) \
|
||||
.values_list('path', flat=True)
|
||||
return log.diffusion, list(remaining)
|
||||
|
||||
def __next_diff(self, diff):
|
||||
"""
|
||||
Return the next diffusion to be played as tuple of (diff, playlist).
|
||||
If diff is given, it is the one to be played right after it.
|
||||
"""
|
||||
station = self.station
|
||||
kwargs = {'start__gte': diff.end} if diff else {}
|
||||
qs = Diffusion.objects.station(station) \
|
||||
.on_air().at().filter(**kwargs) \
|
||||
.distinct().order_by('start')
|
||||
diff = qs.first()
|
||||
return (diff, diff and diff.get_playlist(archive=True) or [])
|
||||
|
||||
def handle_pl_sync(self, source, playlist, diff=None, date=None):
|
||||
"""
|
||||
Update playlist of a source if required, and handle logging when
|
||||
it is needed.
|
||||
|
||||
- source: source on which it happens
|
||||
- playlist: list of sounds to use to update
|
||||
- diff: related diffusion
|
||||
"""
|
||||
if source.playlist == playlist:
|
||||
return
|
||||
|
||||
source.playlist = playlist
|
||||
if diff and not diff.is_live():
|
||||
# log diffusion archive load
|
||||
self.log(type=Log.Type.load,
|
||||
source=source.id,
|
||||
diffusion=diff,
|
||||
date=date,
|
||||
comment='\n'.join(playlist))
|
||||
|
||||
def handle_diff_start(self, source, diff, date):
|
||||
"""
|
||||
Enable dealer in order to play a given diffusion if required,
|
||||
handle start of diffusion
|
||||
"""
|
||||
if not diff or diff.start > date:
|
||||
return
|
||||
|
||||
# TODO: user has not yet put the diffusion sound when diff started
|
||||
# => live logged; what we want: if user put a sound after it
|
||||
# has been logged as live, load and start this sound
|
||||
|
||||
# live: just log it
|
||||
if diff.is_live():
|
||||
diff_ = Log.objects.station(self.station) \
|
||||
.filter(diffusion=diff, type=Log.Type.on_air)
|
||||
if not diff_.count():
|
||||
# log live diffusion
|
||||
self.log(type=Log.Type.on_air, source=source.id,
|
||||
diffusion=diff, date=date)
|
||||
return
|
||||
|
||||
# enable dealer
|
||||
if not source.active:
|
||||
source.active = True
|
||||
last_start = self.last_diff_start
|
||||
if not last_start or last_start.diffusion_id != diff.pk:
|
||||
# log triggered diffusion
|
||||
self.log(type=Log.Type.start, source=source.id,
|
||||
diffusion=diff, date=date)
|
||||
|
||||
def handle(self):
|
||||
def handle_diffusions(self):
|
||||
"""
|
||||
Handle scheduled diffusion, trigger if needed, preload playlists
|
||||
and so on.
|
||||
"""
|
||||
station = self.station
|
||||
dealer = station.dealer
|
||||
if not dealer:
|
||||
# TODO: restart
|
||||
# TODO: handle conflict + cancel
|
||||
diff = Diffusion.objects.station(self.station).on_air().now() \
|
||||
.filter(episode__sound__type=Sound.Type.archive) \
|
||||
.first()
|
||||
log = self.logs.start().filter(diffusion=diff) if diff else None
|
||||
if not diff or log:
|
||||
return
|
||||
|
||||
playlist = Sound.objects.episode(id=diff.episode_id).paths()
|
||||
dealer = self.streamer.dealer
|
||||
dealer.queue(*playlist)
|
||||
self.log(type=Log.Type.start, source=dealer.id, diffusion=diff,
|
||||
comment=str(diff))
|
||||
|
||||
def sync(self):
|
||||
""" Update sources' playlists. """
|
||||
now = tz.now()
|
||||
if self.sync_next is not None and now < self.sync_next:
|
||||
return
|
||||
|
||||
# current and next diffs
|
||||
current_diff, remaining_pl = self.__current_diff()
|
||||
next_diff, next_pl = self.__next_diff(current_diff)
|
||||
self.sync_next = now + tz.timedelta(minutes=self.sync_timeout)
|
||||
|
||||
# playlist
|
||||
dealer.active = bool(remaining_pl)
|
||||
playlist = remaining_pl + next_pl
|
||||
|
||||
self.handle_pl_sync(dealer, playlist, next_diff, now)
|
||||
self.handle_diff_start(dealer, next_diff, now)
|
||||
for source in self.streamer.sources:
|
||||
if isinstance(source, PlaylistSource):
|
||||
source.sync()
|
||||
|
||||
|
||||
class Command (BaseCommand):
|
||||
@ -390,32 +216,31 @@ class Command (BaseCommand):
|
||||
)
|
||||
group.add_argument(
|
||||
'-t', '--timeout', type=int,
|
||||
default=600,
|
||||
help='time to wait in SECONDS before canceling a diffusion that '
|
||||
'has not been ran but should have been. If 0, does not '
|
||||
'check'
|
||||
default=Monitor.cancel_timeout,
|
||||
help='time to wait in MINUTES before canceling a diffusion that '
|
||||
'should have ran but did not. '
|
||||
)
|
||||
# TODO: sync-timeout, cancel-timeout
|
||||
|
||||
def handle(self, *args,
|
||||
config=None, run=None, monitor=None,
|
||||
station=[], delay=1000, timeout=600,
|
||||
**options):
|
||||
|
||||
stations = Station.objects.filter(name__in=station)[:] \
|
||||
if station else Station.objects.all()[:]
|
||||
stations = Station.objects.filter(name__in=station) if station else \
|
||||
Station.objects.all()
|
||||
streamers = [Streamer(station) for station in stations]
|
||||
|
||||
for station in stations:
|
||||
# station.prepare()
|
||||
if config and not run: # no need to write it twice
|
||||
station.streamer.push()
|
||||
for streamer in streamers:
|
||||
if config:
|
||||
streamer.make_config()
|
||||
if run:
|
||||
station.streamer.process_run()
|
||||
streamer.run_process()
|
||||
|
||||
if monitor:
|
||||
monitors = [
|
||||
Monitor(station, cancel_timeout=timeout)
|
||||
for station in stations
|
||||
]
|
||||
monitors = [Monitor(streamer, cancel_timeout=timeout)
|
||||
for streamer in streamers]
|
||||
|
||||
delay = delay / 1000
|
||||
while True:
|
||||
for monitor in monitors:
|
||||
@ -423,5 +248,5 @@ class Command (BaseCommand):
|
||||
time.sleep(delay)
|
||||
|
||||
if run:
|
||||
for station in stations:
|
||||
station.controller.process_wait()
|
||||
for streamer in streamers:
|
||||
streamer.wait_process()
|
||||
|
Reference in New Issue
Block a user