redesign streamer, make it sexy

This commit is contained in:
bkfox 2019-08-02 17:06:23 +02:00
parent 8e1d2b6769
commit 432467ec8d
18 changed files with 210 additions and 100 deletions

View File

@ -1,3 +0,0 @@
include LICENSE
include README.md

View File

View File

@ -22,7 +22,7 @@ class StationAdmin(admin.ModelAdmin):
@admin.register(Log)
class LogAdmin(admin.ModelAdmin):
list_display = ['id', 'date', 'station', 'source', 'type', 'diffusion', 'sound', 'track']
list_display = ['id', 'date', 'station', 'source', 'type', 'comment']
list_filter = ['date', 'source', 'station']

View File

@ -17,6 +17,10 @@ from .models import Port, Station, Sound
from .connector import Connector
# FIXME liquidsoap does not manage timezones -- we have to convert
# 'on_air' metadata we get from it into utc one in order to work
# correctly.
local_tz = tzlocal.get_localzone()
logger = logging.getLogger('aircox')
@ -32,7 +36,11 @@ class Streamer:
sources = None
""" List of all monitored sources """
source = None
""" Current on air source """
""" Current source being played on air """
# note: we disable on_air rids since we don't have use of it for the
# moment
# on_air = None
# """ On-air request ids (rid) """
def __init__(self, station):
self.station = station
@ -46,6 +54,27 @@ class Streamer:
""" Path to Unix socket file """
return self.connector.address
@property
def is_ready(self):
"""
If external program is ready to use, returns True
"""
return self.send('list') != ''
@property
def is_running(self):
if self.process is None:
return False
returncode = self.process.poll()
if returncode is None:
return True
self.process = None
logger.debug('process died with return code %s' % returncode)
return False
# FIXME: is it really needed as property?
@property
def inputs(self):
""" Return input ports of the station """
@ -62,13 +91,6 @@ class Streamer:
active=True,
)
@property
def is_ready(self):
"""
If external program is ready to use, returns True
"""
return self.send('list') != ''
# Sources and config ###############################################
def send(self, *args, **kwargs):
return self.connector.send(*args, **kwargs) or ''
@ -98,26 +120,27 @@ class Streamer:
def sync(self):
""" Sync all sources. """
if self.process is None:
return
for source in self.sources:
source.sync()
def fetch(self):
""" Fetch data from liquidsoap """
if self.process is None:
return
for source in self.sources:
source.fetch()
rid = self.send('request.on_air').split(' ')
if rid:
rid = rid[-1]
# data = self._send('request.metadata ', rid, parse=True)
# if not data:
# return
pred = lambda s: s.rid == rid
else:
pred = lambda s: s.is_playing
# request.on_air is not ordered: we need to do it manually
if self.dealer.is_playing:
self.source = self.dealer
return
self.source = next((source for source in self.sources if pred(source)),
self.source)
self.source = next((source for source in self.sources
if source.is_playing), None)
# Process ##########################################################
def get_process_args(self):
@ -152,10 +175,8 @@ class Streamer:
def kill_process(self):
if self.process:
logger.info("kill process {pid}: {info}".format(
pid=self.process.pid,
info=' '.join(self.get_process_args())
))
logger.debug("kill process %s: %s", self.process.pid,
' '.join(self.get_process_args()))
self.process.kill()
self.process = None
@ -170,12 +191,19 @@ class Streamer:
class Source:
controller = None
""" parent controller """
id = None
""" source id """
uri = ''
""" source uri """
rid = None
""" request id """
air_time = None
""" on air time """
status = None
""" source status """
remaining = 0.0
""" remaining time """
@property
def station(self):
@ -185,6 +213,10 @@ class Source:
def is_playing(self):
return self.status == 'playing'
#@property
#def is_on_air(self):
# return self.rid is not None and self.rid in self.controller.on_air
def __init__(self, controller, id=None):
self.controller = controller
self.id = id
@ -194,14 +226,17 @@ class Source:
pass
def fetch(self):
data = self.controller.send(self.id, '.remaining')
self.remaining = float(data)
data = self.controller.send(self.id, '.get', parse=True)
self.on_metadata(data if data and isinstance(data, dict) else {})
def on_metadata(self, data):
""" Update source info from provided request metadata """
self.rid = data.get('rid')
self.uri = data.get('initial_uri')
self.status = data.get('status')
self.rid = data.get('rid') or None
self.uri = data.get('initial_uri') or None
self.status = data.get('status') or None
air_time = data.get('on_air')
if air_time:
@ -277,8 +312,17 @@ class PlaylistSource(Source):
class QueueSource(Source):
def queue(self, *paths):
queue = None
""" Source's queue (excluded on_air request) """
def append(self, *paths):
""" Add the provided paths to source's play queue """
for path in paths:
print(self.controller.send(self.id, '_queue.push ', path))
self.controller.send(self.id, '_queue.push ', path)
def fetch(self):
super().fetch()
queue = self.controller.send(self.id, '_queue.queue').split(' ')
self.queue = queue

View File

@ -12,7 +12,7 @@ from django.utils import timezone as tz
import aircox.settings as settings
from aircox.models import Log, Station
logger = logging.getLogger('aircox.tools')
logger = logging.getLogger('aircox.commands')
class Command (BaseCommand):

View File

@ -25,7 +25,7 @@ from django.utils import timezone as tz
from aircox.models import Schedule, Diffusion
logger = logging.getLogger('aircox.tools')
logger = logging.getLogger('aircox.commands')
class Actions:

View File

@ -23,7 +23,7 @@ from aircox.models import *
__doc__ = __doc__.format(settings=settings)
logger = logging.getLogger('aircox.tools')
logger = logging.getLogger('aircox.commands')
class PlaylistImport:

View File

@ -42,7 +42,7 @@ from aircox import settings, utils
from aircox.models import Diffusion, Program, Sound
from .import_playlist import PlaylistImport
logger = logging.getLogger('aircox.tools')
logger = logging.getLogger('aircox.commands')
sound_path_re = re.compile(

View File

@ -9,7 +9,7 @@ from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand, CommandError
logger = logging.getLogger('aircox.tools')
logger = logging.getLogger('aircox.commands')
class Stats:

View File

@ -6,27 +6,31 @@ used to:
- cancels Diffusions that have an archive but could not have been played;
- run Liquidsoap
"""
# TODO:
# x controllers: remaining
# x diffusion conflicts
# x cancel
# x when liquidsoap fails to start/exists: exit
# - handle restart after failure
# - file in queue without sound not logged?
# - is stream restart after live ok?
from argparse import RawTextHelpFormatter
import time
import pytz
import tzlocal
from django.db.models import Q
from django.core.management.base import BaseCommand
from django.utils import timezone as tz
from aircox.models import Station, Episode, Diffusion, Track, Sound, Log
from aircox.controllers import Streamer, PlaylistSource
from aircox.models import Station, Episode, Diffusion, Track, Sound, Log
from aircox.utils import date_range
# force using UTC
tz.activate(pytz.UTC)
# FIXME liquidsoap does not manage timezones -- we have to convert
# 'on_air' metadata we get from it into utc one in order to work
# correctly.
local_tz = tzlocal.get_localzone()
class Monitor:
"""
Log and launch diffusions for the given station.
@ -42,6 +46,8 @@ class Monitor:
"""
streamer = None
""" Streamer controller """
delay = None
""" Timedelta: minimal delay between two call of monitor. """
logs = None
""" Queryset to station's logs (ordered by -pk) """
cancel_timeout = 20
@ -65,8 +71,11 @@ class Monitor:
""" Log of last triggered item (sound or diffusion). """
return self.logs.start().with_diff().first()
def __init__(self, streamer, **kwargs):
def __init__(self, streamer, delay, cancel_timeout, **kwargs):
self.streamer = streamer
# adding time ensure all calculation have a margin
self.delay = delay + tz.timedelta(seconds=5)
self.cancel_timeout = cancel_timeout
self.__dict__.update(kwargs)
self.logs = self.get_logs_queryset()
@ -81,6 +90,27 @@ class Monitor:
return
self.streamer.fetch()
# Skip tracing - analyzis:
# Reason: multiple database request every x seconds, reducing it.
# We could skip this part when remaining time is higher than a minimal
# value (which should be derived from Command's delay). Problems:
# - How to trace tracks? (+ Source can change: caching log might sucks)
# - if liquidsoap's source/request changes: remaining time goes higher,
# thus avoiding fetch
#
# Approach: something like having a mean time, such as:
#
# ```
# source = stream.source
# mean_time = source.air_time
# + min(next_track.timestamp, source.remaining)
# - (command.delay + 1)
# trace_required = \/ source' != source
# \/ source.uri' != source.uri
# \/ now < mean_time
# ```
#
source = self.streamer.source
if source and source.uri:
log = self.trace_sound(source)
@ -102,21 +132,22 @@ class Monitor:
def trace_sound(self, source):
""" Return on air sound log (create if not present). """
sound_path, air_time = source.uri, source.air_time
air_uri, air_time = source.uri, source.air_time
# check if there is yet a log for this sound on the source
delta = tz.timedelta(seconds=5)
air_times = (air_time - delta, air_time + delta)
log = self.logs.on_air().filter(source=source.id,
sound__path=sound_path,
date__range=air_times).first()
log = self.logs.on_air().filter(
Q(sound__path=air_uri) |
# sound can be null when arbitrary sound file is played
Q(sound__isnull=True, track__isnull=True, comment=air_uri),
source=source.id,
date__range=date_range(air_time, self.delay),
).first()
if log:
return log
# get sound
diff = None
sound = Sound.objects.filter(path=sound_path).first()
sound = Sound.objects.filter(path=air_uri).first()
if sound and sound.episode_id is not None:
diff = Diffusion.objects.episode(id=sound.episode_id).on_air() \
.now(air_time).first()
@ -124,7 +155,7 @@ class Monitor:
# log sound on air
return self.log(type=Log.Type.on_air, date=source.air_time,
source=source.id, sound=sound, diffusion=diff,
comment=sound_path)
comment=air_uri)
def trace_tracks(self, log):
"""
@ -155,19 +186,56 @@ class Monitor:
Handle scheduled diffusion, trigger if needed, preload playlists
and so on.
"""
# TODO: restart
# TODO: handle conflict + cancel
diff = Diffusion.objects.station(self.station).on_air().now() \
# TODO: program restart
# Diffusion conflicts are handled by the way a diffusion is defined
# as candidate for the next dealer's start.
#
# ```
# logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START
# /\ log.diff = diff
# /\ log.date = diff.start
# queue_empty: /\ dealer.queue is empty
# /\ \/ ~dealer.on_air
# \/ dealer.remaining < delay
#
# start_allowed: /\ diff not in logged_diff
# /\ queue_empty
#
# start_canceled: /\ diff not in logged diff
# /\ ~queue_empty
# /\ diff.start < now + cancel_timeout
# ```
#
now = tz.now()
diff = Diffusion.objects.station(self.station).on_air().now(now) \
.filter(episode__sound__type=Sound.Type.archive) \
.first()
log = self.logs.start().filter(diffusion=diff) if diff else None
# Can't use delay: diffusion may start later than its assigned start.
log = None if not diff else self.logs.start().filter(diffusion=diff)
if not diff or log:
return
playlist = Sound.objects.episode(id=diff.episode_id).paths()
dealer = self.streamer.dealer
dealer.queue(*playlist)
self.log(type=Log.Type.start, source=dealer.id, diffusion=diff,
# start
if not dealer.queue and dealer.rid is None or \
dealer.remaining < self.delay.total_seconds():
self.start_diff(dealer, diff)
# cancel
if diff.start < now - self.cancel_timeout:
self.cancel_diff(dealer, diff)
def start_diff(self, source, diff):
playlist = Sound.objects.episode(id=diff.episode_id).paths()
source.append(*playlist)
self.log(type=Log.Type.start, source=source.id, diffusion=diff,
comment=str(diff))
def cancel_diff(self, source, diff):
diff.type = Diffusion.Type.cancel
diff.save()
self.log(type=Log.Type.cancel, source=source.id, diffusion=diff,
comment=str(diff))
def sync(self):
@ -207,7 +275,8 @@ class Command (BaseCommand):
'-d', '--delay', type=int,
default=1000,
help='time to sleep in MILLISECONDS between two updates when we '
'monitor'
'monitor. This influence the delay before a diffusion is '
'launched.'
)
group.add_argument(
'-s', '--station', type=str, action='append',
@ -215,7 +284,7 @@ class Command (BaseCommand):
'all stations'
)
group.add_argument(
'-t', '--timeout', type=int,
'-t', '--timeout', type=float,
default=Monitor.cancel_timeout,
help='time to wait in MINUTES before canceling a diffusion that '
'should have ran but did not. '
@ -226,7 +295,6 @@ class Command (BaseCommand):
config=None, run=None, monitor=None,
station=[], delay=1000, timeout=600,
**options):
stations = Station.objects.filter(name__in=station) if station else \
Station.objects.all()
streamers = [Streamer(station) for station in stations]
@ -238,14 +306,15 @@ class Command (BaseCommand):
streamer.run_process()
if monitor:
monitors = [Monitor(streamer, cancel_timeout=timeout)
delay = tz.timedelta(milliseconds=delay)
timeout = tz.timedelta(minutes=timeout)
monitors = [Monitor(streamer, delay, timeout)
for streamer in streamers]
delay = delay / 1000
while True:
while not run or streamer.is_running:
for monitor in monitors:
monitor.monitor()
time.sleep(delay)
time.sleep(delay.total_seconds())
if run:
for streamer in streamers:

View File

@ -122,7 +122,7 @@ class Diffusion(BaseRerun):
class Type(IntEnum):
on_air = 0x00
unconfirmed = 0x01
canceled = 0x02
cancel = 0x02
episode = models.ForeignKey(
Episode, models.CASCADE,

View File

@ -167,18 +167,14 @@ class Log(models.Model):
"""
Source has been stopped, e.g. manually
"""
# Rule: \/ diffusion != null \/ sound != null
start = 0x01
"""
The diffusion or sound has been triggered by the streamer or
manually.
"""
load = 0x02
"""
A playlist has updated, and loading started. A related Diffusion
does not means that the playlist is only for it (e.g. after a
crash, it can reload previous remaining sound files + thoses of
the next diffusion)
"""
""" Diffusion or sound has been request to be played. """
cancel = 0x02
""" Diffusion has been canceled. """
# Rule: \/ sound != null /\ track == null
# \/ sound == null /\ track != null
# \/ sound == null /\ track == null /\ comment = sound_path
on_air = 0x03
"""
The sound or diffusion has been detected occurring on air. Can
@ -186,9 +182,7 @@ class Log(models.Model):
them since they don't have an attached sound archive.
"""
other = 0x04
"""
Other log
"""
""" Other log """
station = models.ForeignKey(
Station, models.CASCADE,
@ -216,12 +210,6 @@ class Log(models.Model):
max_length=512, blank=True, null=True,
verbose_name=_('comment'),
)
diffusion = models.ForeignKey(
Diffusion, models.SET_NULL,
blank=True, null=True, db_index=True,
verbose_name=_('Diffusion'),
)
sound = models.ForeignKey(
Sound, models.SET_NULL,
blank=True, null=True, db_index=True,
@ -232,6 +220,11 @@ class Log(models.Model):
blank=True, null=True, db_index=True,
verbose_name=_('Track'),
)
diffusion = models.ForeignKey(
Diffusion, models.SET_NULL,
blank=True, null=True, db_index=True,
verbose_name=_('Diffusion'),
)
objects = LogQuerySet.as_manager()

View File

@ -42,6 +42,11 @@ def interactive (id, s) =
description="Seek to a relative position",
usage="seek <duration>",
"seek", fun (x) -> begin seek(s, x) end)
server.register(namespace=id,
description="Get source's track remaining time",
usage="remaining",
"remaining", fun (_) -> begin json_of(source.remaining(s)) end)
s = store_metadata(id=id, size=1, s)
add_skip_command(s)
s

View File

@ -2,16 +2,18 @@ import datetime
import django.utils.timezone as tz
def date_range(date):
def date_range(date, delta=None, **delta_kwargs):
"""
Return a range of provided date such as `[date-delta, date+delta]`.
:param date: the reference date
:param delta: timedelta
:param \**delta_kwargs: timedelta init arguments
Return a datetime range for a given day, as:
```(date, 0:0:0:0; date, 23:59:59:999)```.
"""
date = date_or_default(date, tz.datetime)
return (
date.replace(hour=0, minute=0, second=0),
date.replace(hour=23, minute=59, second=59, microsecond=999)
)
delta = tz.timedelta(**delta_kwargs) if delta is None else delta
return [date - delta, date + delta]
def cast_date(date, into=datetime.date):

View File

@ -15,12 +15,12 @@ def to_array (path):
setup(
name='aircox',
version='0.1',
version='0.9',
license='GPLv3',
author='bkfox',
description='Aircox is a radio programs manager that includes tools and cms',
description='Aircox is a radio programs manager including tools and cms',
long_description=to_rst('README.md'),
url='http://bkfox.net/',
url='https://github.com/bkfox/aircox',
packages=find_packages(),
include_package_data=True,
install_requires=to_array('requirements.txt'),