diff --git a/Manifest.in b/Manifest.in deleted file mode 100755 index 1001e08..0000000 --- a/Manifest.in +++ /dev/null @@ -1,3 +0,0 @@ -include LICENSE -include README.md - diff --git a/__init__.py b/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/aircox/admin/__init__.py b/aircox/admin/__init__.py index 1954e33..f7fcc8d 100644 --- a/aircox/admin/__init__.py +++ b/aircox/admin/__init__.py @@ -22,7 +22,7 @@ class StationAdmin(admin.ModelAdmin): @admin.register(Log) class LogAdmin(admin.ModelAdmin): - list_display = ['id', 'date', 'station', 'source', 'type', 'diffusion', 'sound', 'track'] + list_display = ['id', 'date', 'station', 'source', 'type', 'comment'] list_filter = ['date', 'source', 'station'] diff --git a/aircox/admin/__pycache__/__init__.cpython-37.pyc b/aircox/admin/__pycache__/__init__.cpython-37.pyc index 70c1708..71693ed 100644 Binary files a/aircox/admin/__pycache__/__init__.cpython-37.pyc and b/aircox/admin/__pycache__/__init__.cpython-37.pyc differ diff --git a/aircox/controllers.py b/aircox/controllers.py index f0b16d6..cf8b7cb 100755 --- a/aircox/controllers.py +++ b/aircox/controllers.py @@ -17,6 +17,10 @@ from .models import Port, Station, Sound from .connector import Connector +# FIXME liquidsoap does not manage timezones -- we have to convert +# 'on_air' metadata we get from it into utc one in order to work +# correctly. + local_tz = tzlocal.get_localzone() logger = logging.getLogger('aircox') @@ -32,7 +36,11 @@ class Streamer: sources = None """ List of all monitored sources """ source = None - """ Current on air source """ + """ Current source being played on air """ + # note: we disable on_air rids since we don't have use of it for the + # moment + # on_air = None + # """ On-air request ids (rid) """ def __init__(self, station): self.station = station @@ -46,6 +54,27 @@ class Streamer: """ Path to Unix socket file """ return self.connector.address + @property + def is_ready(self): + """ + If external program is ready to use, returns True + """ + return self.send('list') != '' + + @property + def is_running(self): + if self.process is None: + return False + + returncode = self.process.poll() + if returncode is None: + return True + + self.process = None + logger.debug('process died with return code %s' % returncode) + return False + + # FIXME: is it really needed as property? @property def inputs(self): """ Return input ports of the station """ @@ -62,13 +91,6 @@ class Streamer: active=True, ) - @property - def is_ready(self): - """ - If external program is ready to use, returns True - """ - return self.send('list') != '' - # Sources and config ############################################### def send(self, *args, **kwargs): return self.connector.send(*args, **kwargs) or '' @@ -98,26 +120,27 @@ class Streamer: def sync(self): """ Sync all sources. """ + if self.process is None: + return + for source in self.sources: source.sync() def fetch(self): """ Fetch data from liquidsoap """ + if self.process is None: + return + for source in self.sources: source.fetch() - rid = self.send('request.on_air').split(' ') - if rid: - rid = rid[-1] - # data = self._send('request.metadata ', rid, parse=True) - # if not data: - # return - pred = lambda s: s.rid == rid - else: - pred = lambda s: s.is_playing + # request.on_air is not ordered: we need to do it manually + if self.dealer.is_playing: + self.source = self.dealer + return - self.source = next((source for source in self.sources if pred(source)), - self.source) + self.source = next((source for source in self.sources + if source.is_playing), None) # Process ########################################################## def get_process_args(self): @@ -152,10 +175,8 @@ class Streamer: def kill_process(self): if self.process: - logger.info("kill process {pid}: {info}".format( - pid=self.process.pid, - info=' '.join(self.get_process_args()) - )) + logger.debug("kill process %s: %s", self.process.pid, + ' '.join(self.get_process_args())) self.process.kill() self.process = None @@ -170,12 +191,19 @@ class Streamer: class Source: controller = None + """ parent controller """ id = None - + """ source id """ uri = '' + """ source uri """ rid = None + """ request id """ air_time = None + """ on air time """ status = None + """ source status """ + remaining = 0.0 + """ remaining time """ @property def station(self): @@ -185,6 +213,10 @@ class Source: def is_playing(self): return self.status == 'playing' + #@property + #def is_on_air(self): + # return self.rid is not None and self.rid in self.controller.on_air + def __init__(self, controller, id=None): self.controller = controller self.id = id @@ -194,14 +226,17 @@ class Source: pass def fetch(self): + data = self.controller.send(self.id, '.remaining') + self.remaining = float(data) + data = self.controller.send(self.id, '.get', parse=True) self.on_metadata(data if data and isinstance(data, dict) else {}) def on_metadata(self, data): """ Update source info from provided request metadata """ - self.rid = data.get('rid') - self.uri = data.get('initial_uri') - self.status = data.get('status') + self.rid = data.get('rid') or None + self.uri = data.get('initial_uri') or None + self.status = data.get('status') or None air_time = data.get('on_air') if air_time: @@ -277,8 +312,17 @@ class PlaylistSource(Source): class QueueSource(Source): - def queue(self, *paths): + queue = None + """ Source's queue (excluded on_air request) """ + + def append(self, *paths): """ Add the provided paths to source's play queue """ for path in paths: - print(self.controller.send(self.id, '_queue.push ', path)) + self.controller.send(self.id, '_queue.push ', path) + + def fetch(self): + super().fetch() + queue = self.controller.send(self.id, '_queue.queue').split(' ') + self.queue = queue + diff --git a/aircox/management/commands/archiver.py b/aircox/management/commands/archiver.py index 06893cf..b652af5 100644 --- a/aircox/management/commands/archiver.py +++ b/aircox/management/commands/archiver.py @@ -12,7 +12,7 @@ from django.utils import timezone as tz import aircox.settings as settings from aircox.models import Log, Station -logger = logging.getLogger('aircox.tools') +logger = logging.getLogger('aircox.commands') class Command (BaseCommand): diff --git a/aircox/management/commands/diffusions.py b/aircox/management/commands/diffusions.py index f75fbd3..0b8085c 100755 --- a/aircox/management/commands/diffusions.py +++ b/aircox/management/commands/diffusions.py @@ -25,7 +25,7 @@ from django.utils import timezone as tz from aircox.models import Schedule, Diffusion -logger = logging.getLogger('aircox.tools') +logger = logging.getLogger('aircox.commands') class Actions: diff --git a/aircox/management/commands/import_playlist.py b/aircox/management/commands/import_playlist.py index 011e221..122aa20 100755 --- a/aircox/management/commands/import_playlist.py +++ b/aircox/management/commands/import_playlist.py @@ -23,7 +23,7 @@ from aircox.models import * __doc__ = __doc__.format(settings=settings) -logger = logging.getLogger('aircox.tools') +logger = logging.getLogger('aircox.commands') class PlaylistImport: diff --git a/aircox/management/commands/sounds_monitor.py b/aircox/management/commands/sounds_monitor.py index 422d08d..2767eaa 100755 --- a/aircox/management/commands/sounds_monitor.py +++ b/aircox/management/commands/sounds_monitor.py @@ -42,7 +42,7 @@ from aircox import settings, utils from aircox.models import Diffusion, Program, Sound from .import_playlist import PlaylistImport -logger = logging.getLogger('aircox.tools') +logger = logging.getLogger('aircox.commands') sound_path_re = re.compile( diff --git a/aircox/management/commands/sounds_quality_check.py b/aircox/management/commands/sounds_quality_check.py index b25b8d4..b2469b0 100755 --- a/aircox/management/commands/sounds_quality_check.py +++ b/aircox/management/commands/sounds_quality_check.py @@ -9,7 +9,7 @@ from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand, CommandError -logger = logging.getLogger('aircox.tools') +logger = logging.getLogger('aircox.commands') class Stats: diff --git a/aircox/management/commands/streamer.py b/aircox/management/commands/streamer.py index c3c66a0..31b0485 100755 --- a/aircox/management/commands/streamer.py +++ b/aircox/management/commands/streamer.py @@ -6,27 +6,31 @@ used to: - cancels Diffusions that have an archive but could not have been played; - run Liquidsoap """ +# TODO: +# x controllers: remaining +# x diffusion conflicts +# x cancel +# x when liquidsoap fails to start/exists: exit +# - handle restart after failure +# - file in queue without sound not logged? +# - is stream restart after live ok? from argparse import RawTextHelpFormatter import time import pytz -import tzlocal +from django.db.models import Q from django.core.management.base import BaseCommand from django.utils import timezone as tz -from aircox.models import Station, Episode, Diffusion, Track, Sound, Log from aircox.controllers import Streamer, PlaylistSource +from aircox.models import Station, Episode, Diffusion, Track, Sound, Log +from aircox.utils import date_range + # force using UTC tz.activate(pytz.UTC) -# FIXME liquidsoap does not manage timezones -- we have to convert -# 'on_air' metadata we get from it into utc one in order to work -# correctly. -local_tz = tzlocal.get_localzone() - - class Monitor: """ Log and launch diffusions for the given station. @@ -42,6 +46,8 @@ class Monitor: """ streamer = None """ Streamer controller """ + delay = None + """ Timedelta: minimal delay between two call of monitor. """ logs = None """ Queryset to station's logs (ordered by -pk) """ cancel_timeout = 20 @@ -65,8 +71,11 @@ class Monitor: """ Log of last triggered item (sound or diffusion). """ return self.logs.start().with_diff().first() - def __init__(self, streamer, **kwargs): + def __init__(self, streamer, delay, cancel_timeout, **kwargs): self.streamer = streamer + # adding time ensure all calculation have a margin + self.delay = delay + tz.timedelta(seconds=5) + self.cancel_timeout = cancel_timeout self.__dict__.update(kwargs) self.logs = self.get_logs_queryset() @@ -81,6 +90,27 @@ class Monitor: return self.streamer.fetch() + + # Skip tracing - analyzis: + # Reason: multiple database request every x seconds, reducing it. + # We could skip this part when remaining time is higher than a minimal + # value (which should be derived from Command's delay). Problems: + # - How to trace tracks? (+ Source can change: caching log might sucks) + # - if liquidsoap's source/request changes: remaining time goes higher, + # thus avoiding fetch + # + # Approach: something like having a mean time, such as: + # + # ``` + # source = stream.source + # mean_time = source.air_time + # + min(next_track.timestamp, source.remaining) + # - (command.delay + 1) + # trace_required = \/ source' != source + # \/ source.uri' != source.uri + # \/ now < mean_time + # ``` + # source = self.streamer.source if source and source.uri: log = self.trace_sound(source) @@ -102,21 +132,22 @@ class Monitor: def trace_sound(self, source): """ Return on air sound log (create if not present). """ - sound_path, air_time = source.uri, source.air_time + air_uri, air_time = source.uri, source.air_time # check if there is yet a log for this sound on the source - delta = tz.timedelta(seconds=5) - air_times = (air_time - delta, air_time + delta) - - log = self.logs.on_air().filter(source=source.id, - sound__path=sound_path, - date__range=air_times).first() + log = self.logs.on_air().filter( + Q(sound__path=air_uri) | + # sound can be null when arbitrary sound file is played + Q(sound__isnull=True, track__isnull=True, comment=air_uri), + source=source.id, + date__range=date_range(air_time, self.delay), + ).first() if log: return log # get sound diff = None - sound = Sound.objects.filter(path=sound_path).first() + sound = Sound.objects.filter(path=air_uri).first() if sound and sound.episode_id is not None: diff = Diffusion.objects.episode(id=sound.episode_id).on_air() \ .now(air_time).first() @@ -124,7 +155,7 @@ class Monitor: # log sound on air return self.log(type=Log.Type.on_air, date=source.air_time, source=source.id, sound=sound, diffusion=diff, - comment=sound_path) + comment=air_uri) def trace_tracks(self, log): """ @@ -155,19 +186,56 @@ class Monitor: Handle scheduled diffusion, trigger if needed, preload playlists and so on. """ - # TODO: restart - # TODO: handle conflict + cancel - diff = Diffusion.objects.station(self.station).on_air().now() \ + # TODO: program restart + + # Diffusion conflicts are handled by the way a diffusion is defined + # as candidate for the next dealer's start. + # + # ``` + # logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START + # /\ log.diff = diff + # /\ log.date = diff.start + # queue_empty: /\ dealer.queue is empty + # /\ \/ ~dealer.on_air + # \/ dealer.remaining < delay + # + # start_allowed: /\ diff not in logged_diff + # /\ queue_empty + # + # start_canceled: /\ diff not in logged diff + # /\ ~queue_empty + # /\ diff.start < now + cancel_timeout + # ``` + # + now = tz.now() + diff = Diffusion.objects.station(self.station).on_air().now(now) \ .filter(episode__sound__type=Sound.Type.archive) \ .first() - log = self.logs.start().filter(diffusion=diff) if diff else None + # Can't use delay: diffusion may start later than its assigned start. + log = None if not diff else self.logs.start().filter(diffusion=diff) if not diff or log: return - playlist = Sound.objects.episode(id=diff.episode_id).paths() dealer = self.streamer.dealer - dealer.queue(*playlist) - self.log(type=Log.Type.start, source=dealer.id, diffusion=diff, + # start + if not dealer.queue and dealer.rid is None or \ + dealer.remaining < self.delay.total_seconds(): + self.start_diff(dealer, diff) + + # cancel + if diff.start < now - self.cancel_timeout: + self.cancel_diff(dealer, diff) + + def start_diff(self, source, diff): + playlist = Sound.objects.episode(id=diff.episode_id).paths() + source.append(*playlist) + self.log(type=Log.Type.start, source=source.id, diffusion=diff, + comment=str(diff)) + + def cancel_diff(self, source, diff): + diff.type = Diffusion.Type.cancel + diff.save() + self.log(type=Log.Type.cancel, source=source.id, diffusion=diff, comment=str(diff)) def sync(self): @@ -207,7 +275,8 @@ class Command (BaseCommand): '-d', '--delay', type=int, default=1000, help='time to sleep in MILLISECONDS between two updates when we ' - 'monitor' + 'monitor. This influence the delay before a diffusion is ' + 'launched.' ) group.add_argument( '-s', '--station', type=str, action='append', @@ -215,7 +284,7 @@ class Command (BaseCommand): 'all stations' ) group.add_argument( - '-t', '--timeout', type=int, + '-t', '--timeout', type=float, default=Monitor.cancel_timeout, help='time to wait in MINUTES before canceling a diffusion that ' 'should have ran but did not. ' @@ -226,7 +295,6 @@ class Command (BaseCommand): config=None, run=None, monitor=None, station=[], delay=1000, timeout=600, **options): - stations = Station.objects.filter(name__in=station) if station else \ Station.objects.all() streamers = [Streamer(station) for station in stations] @@ -238,14 +306,15 @@ class Command (BaseCommand): streamer.run_process() if monitor: - monitors = [Monitor(streamer, cancel_timeout=timeout) + delay = tz.timedelta(milliseconds=delay) + timeout = tz.timedelta(minutes=timeout) + monitors = [Monitor(streamer, delay, timeout) for streamer in streamers] - delay = delay / 1000 - while True: + while not run or streamer.is_running: for monitor in monitors: monitor.monitor() - time.sleep(delay) + time.sleep(delay.total_seconds()) if run: for streamer in streamers: diff --git a/aircox/models/__pycache__/episode.cpython-37.pyc b/aircox/models/__pycache__/episode.cpython-37.pyc index 5daa7d1..0246dd5 100644 Binary files a/aircox/models/__pycache__/episode.cpython-37.pyc and b/aircox/models/__pycache__/episode.cpython-37.pyc differ diff --git a/aircox/models/__pycache__/log.cpython-37.pyc b/aircox/models/__pycache__/log.cpython-37.pyc index 893589b..556d5da 100644 Binary files a/aircox/models/__pycache__/log.cpython-37.pyc and b/aircox/models/__pycache__/log.cpython-37.pyc differ diff --git a/aircox/models/episode.py b/aircox/models/episode.py index 5d31e4c..3c9a829 100644 --- a/aircox/models/episode.py +++ b/aircox/models/episode.py @@ -122,7 +122,7 @@ class Diffusion(BaseRerun): class Type(IntEnum): on_air = 0x00 unconfirmed = 0x01 - canceled = 0x02 + cancel = 0x02 episode = models.ForeignKey( Episode, models.CASCADE, diff --git a/aircox/models/log.py b/aircox/models/log.py index 770297e..6caf041 100644 --- a/aircox/models/log.py +++ b/aircox/models/log.py @@ -167,18 +167,14 @@ class Log(models.Model): """ Source has been stopped, e.g. manually """ + # Rule: \/ diffusion != null \/ sound != null start = 0x01 - """ - The diffusion or sound has been triggered by the streamer or - manually. - """ - load = 0x02 - """ - A playlist has updated, and loading started. A related Diffusion - does not means that the playlist is only for it (e.g. after a - crash, it can reload previous remaining sound files + thoses of - the next diffusion) - """ + """ Diffusion or sound has been request to be played. """ + cancel = 0x02 + """ Diffusion has been canceled. """ + # Rule: \/ sound != null /\ track == null + # \/ sound == null /\ track != null + # \/ sound == null /\ track == null /\ comment = sound_path on_air = 0x03 """ The sound or diffusion has been detected occurring on air. Can @@ -186,9 +182,7 @@ class Log(models.Model): them since they don't have an attached sound archive. """ other = 0x04 - """ - Other log - """ + """ Other log """ station = models.ForeignKey( Station, models.CASCADE, @@ -216,12 +210,6 @@ class Log(models.Model): max_length=512, blank=True, null=True, verbose_name=_('comment'), ) - - diffusion = models.ForeignKey( - Diffusion, models.SET_NULL, - blank=True, null=True, db_index=True, - verbose_name=_('Diffusion'), - ) sound = models.ForeignKey( Sound, models.SET_NULL, blank=True, null=True, db_index=True, @@ -232,6 +220,11 @@ class Log(models.Model): blank=True, null=True, db_index=True, verbose_name=_('Track'), ) + diffusion = models.ForeignKey( + Diffusion, models.SET_NULL, + blank=True, null=True, db_index=True, + verbose_name=_('Diffusion'), + ) objects = LogQuerySet.as_manager() diff --git a/aircox/templates/aircox/scripts/station.liq b/aircox/templates/aircox/scripts/station.liq index 818f673..4c74d5d 100755 --- a/aircox/templates/aircox/scripts/station.liq +++ b/aircox/templates/aircox/scripts/station.liq @@ -42,6 +42,11 @@ def interactive (id, s) = description="Seek to a relative position", usage="seek ", "seek", fun (x) -> begin seek(s, x) end) + server.register(namespace=id, + description="Get source's track remaining time", + usage="remaining", + "remaining", fun (_) -> begin json_of(source.remaining(s)) end) + s = store_metadata(id=id, size=1, s) add_skip_command(s) s diff --git a/aircox/utils.py b/aircox/utils.py index bc40228..34d2c4c 100755 --- a/aircox/utils.py +++ b/aircox/utils.py @@ -2,16 +2,18 @@ import datetime import django.utils.timezone as tz -def date_range(date): +def date_range(date, delta=None, **delta_kwargs): """ + Return a range of provided date such as `[date-delta, date+delta]`. + :param date: the reference date + :param delta: timedelta + :param \**delta_kwargs: timedelta init arguments + Return a datetime range for a given day, as: ```(date, 0:0:0:0; date, 23:59:59:999)```. """ - date = date_or_default(date, tz.datetime) - return ( - date.replace(hour=0, minute=0, second=0), - date.replace(hour=23, minute=59, second=59, microsecond=999) - ) + delta = tz.timedelta(**delta_kwargs) if delta is None else delta + return [date - delta, date + delta] def cast_date(date, into=datetime.date): diff --git a/setup.py b/setup.py index be65c0d..44b04aa 100755 --- a/setup.py +++ b/setup.py @@ -15,12 +15,12 @@ def to_array (path): setup( name='aircox', - version='0.1', + version='0.9', license='GPLv3', author='bkfox', - description='Aircox is a radio programs manager that includes tools and cms', + description='Aircox is a radio programs manager including tools and cms', long_description=to_rst('README.md'), - url='http://bkfox.net/', + url='https://github.com/bkfox/aircox', packages=find_packages(), include_package_data=True, install_requires=to_array('requirements.txt'),