start app controllers that aims to replace liquidsoap on long term, and be more generic and reusable

This commit is contained in:
bkfox 2016-07-12 11:11:21 +02:00
parent 37b807b403
commit 0d75f65ed4
12 changed files with 988 additions and 87 deletions

2
controllers/__init__.py Normal file
View File

@ -0,0 +1,2 @@

17
controllers/admin.py Normal file
View File

@ -0,0 +1,17 @@
from django.contrib import admin
import aircox.controllers.models as models
#@admin.register(Log)
#class LogAdmin(admin.ModelAdmin):
# list_display = ['id', 'date', 'source', 'comment', 'related_object']
# list_filter = ['date', 'source', 'related_type']
admin.site.register(models.Station)
admin.site.register(models.Source)
admin.site.register(models.Output)
admin.site.register(models.Log)

415
controllers/models.py Normal file
View File

@ -0,0 +1,415 @@
"""
Classes that define common interfaces in order to control external
software that generate the audio streams for us, such as Liquidsoap.
It must be implemented per program in order to work.
Basically, we follow the follow the idea that a Station has different
sources that are used to generate the audio stream:
- **stream**: one source per Streamed program;
- **dealer**: one source for all Scheduled programs;
- **master**: main output
"""
import os
from enum import Enum, IntEnum
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.utils.translation import ugettext as _, ugettext_lazy
import aircox.programs.models as programs
from aircox.programs.utils import to_timedelta
import aircox.controllers.settings as settings
from aircox.controllers.plugins.plugins import Plugins
Plugins.discover()
class Station(programs.Nameable):
path = models.CharField(
_('path'),
help_text = _('path to the working directory'),
max_length = 256,
blank = True,
)
plugin_name = models.CharField(
_('plugin'),
max_length = 32,
choices = [ (name, name) for name in Plugins.registry.keys() ],
)
plugin = None
"""
The plugin used for this station. This is initialized at __init__,
based on self.plugin_name and should not be changed.
"""
controller = None
"""
Controllers over the station. It is implemented by the plugin using
plugin.StationController
"""
def get_sources(self, type = None, prepare = True):
"""
Return a list of active sources that can have their controllers
initialized.
"""
qs = self.source_set.filter(active = True)
if type:
qs = qs.filter(type = type)
return [ source.prepare() or source for source in qs ]
@property
def dealer_sources(self):
return self.get_sources(Source.Type.dealer)
@property
def dealer(self):
dealers = self.dealer_sources
return dealers[0] if dealers else None
@property
def stream_sources(self):
return self.get_sources(type = Source.Type.stream)
@property
def file_sources(self):
return self.get_sources(type = Source.Type.file)
@property
def fallback_sources(self):
return self.get_sources(type = Source.Type.fallback)
@property
def outputs(self):
"""
List of active outputs
"""
return [ output for output in self.output_set if output.active ]
def prepare(self, fetch = True):
"""
Initialize station's controller. Does not initialize sources'
controllers.
Note that the Station must have been saved first, in order to
have correct informations such as the working path.
"""
if not self.pk:
raise ValueError('station be must saved first')
self.controller = self.plugin.init_station(self)
for source in self.source_set.all():
source.prepare()
if fetch:
self.controller.fetch()
def make_sources(self):
"""
Generate default sources for the station and save them.
"""
Source(station = self,
type = Source.Type.dealer,
name = _('Dealer')).save()
streams = programs.Program.objects.filter(
active = True, stream__isnull = False
)
for stream in streams:
Source(station = self,
type = Source.Type.stream,
name = stream.name,
program = stream).save()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.plugin_name:
self.plugin = Plugins.registry.get(self.plugin_name)
def save(self, make_sources = True, *args, **kwargs):
"""
* make_sources: if the model has not been yet saved, generate
sources for it.
"""
if not self.path:
self.path = os.path.join(
settings.AIRCOX_CONTROLLERS_MEDIA,
self.slug
)
super().save(*args, **kwargs)
if make_sources and not self.source_set.count():
self.make_sources()
self.prepare()
# test
self.prepare()
self.controller.push()
class Source(programs.Nameable):
"""
Source designate a source for the audio stream.
A Source can have different types, that are declared here by order
of priority. A lower value priority means that the source has a higher
priority.
"""
class Type(IntEnum):
file = 0x01
"""
Use a file as input, that can either be a local or distant file.
Path must be set.
Should be used only for exceptional cases, such as streaming from
distant place.
"""
dealer = 0x02
"""
This source is used for scheduled programs. For the moment only
one should be set.
"""
stream = 0x03
"""
Source related to a streamed programs (one for each). programs.Program
must be set in this case.
It uses program's stream information in order to generate correct
delays or time ranges.
"""
fallback = 0x05
"""
Same as file, but declared with a lower priority than streams.
Their goal is to be used when no other source is available, so
it is NOT interactive.
"""
station = models.ForeignKey(
Station,
verbose_name = _('station'),
)
type = models.SmallIntegerField(
verbose_name = _('type'),
choices = [ (int(y), _(x)) for x,y in Type.__members__.items() ],
)
active = models.BooleanField(
_('active'),
default = True,
help_text = _('this source is active')
)
program = models.ForeignKey(
programs.Program,
verbose_name = _('related program'),
blank = True, null = True
)
url = models.TextField(
_('url'),
blank = True, null = True,
help_text = _('url related to a file local or distant')
)
controller = None
"""
Implement controls over a Source. This is done by the plugin, that
implements plugin.SourceController;
"""
@property
def stream(self):
if self.type != self.Type.stream or not self.program:
return
return self.program.stream_set and self.program.stream_set.first()
def prepare(self, fetch = True):
"""
Create a controller
"""
self.controller = self.station.plugin.init_source(self)
if fetch:
self.controller.fetch()
def load_playlist(self, diffusion = None, program = None):
"""
Load a playlist to the controller. If diffusion or program is
given use it, otherwise, try with self.program if exists, or
(if URI, self.value).
A playlist from a program uses all archives available for the
program.
"""
if diffusion:
self.controller.playlist = diffusion.playlist
return
program = program or self.stream
if program:
self.controller.playlist = [ sound.path for sound in
programs.Sound.objects.filter(
type = programs.Sound.Type.archive,
removed = False,
path__startswith = program.path
)
]
return
if self.type == self.Type.file and self.value:
self.controller.playlist = [ self.value ]
return
def save(self, *args, **kwargs):
if self.type in (self.Type.file, self.Type.fallback) and \
not self.url:
raise ValueError('url is missing but required')
if self.type == self.Type.stream and \
(not self.program or not self.program.stream_set.count()):
raise ValueError('missing related stream program; program must be '
'a streamed program')
super().save(*args, **kwargs)
# TODO update controls
class Output (models.Model):
class Type(IntEnum):
jack = 0x00
alsa = 0x01
icecast = 0x02
station = models.ForeignKey(
Station,
verbose_name = _('station'),
)
type = models.SmallIntegerField(
_('type'),
# we don't translate the names since it is project names.
choices = [ (int(y), x) for x,y in Type.__members__.items() ],
)
active = models.BooleanField(
_('active'),
default = True,
help_text = _('this output is active')
)
settings = models.TextField(
_('output settings'),
help_text = _('list of comma separated params available; '
'this is put in the output config as raw code; '
'plugin related'),
blank = True, null = True
)
class Log(models.Model):
"""
Log sounds and diffusions that are played on the station.
This only remember what has been played on the outputs, not on each
track; Source designate here which source is responsible of that.
"""
class Type(IntEnum):
stop = 0x00
"""
Source has been stopped (only when there is no more sound)
"""
play = 0x01
"""
Source has been started/changed and is running related_object
If no related_object is available, comment is used to designate
the sound.
"""
load = 0x02
"""
Source starts to be preload related_object
"""
type = models.SmallIntegerField(
verbose_name = _('type'),
choices = [ (int(y), _(x)) for x,y in Type.__members__.items() ],
blank = True, null = True,
)
station = models.ForeignKey(
Station,
verbose_name = _('station'),
help_text = _('station on which the event occured'),
)
source = models.CharField(
# we use a CharField to avoid loosing logs information if the
# source is removed
_('source'),
max_length=64,
help_text = _('source id that make it happen on the station'),
blank = True, null = True,
)
date = models.DateTimeField(
_('date'),
auto_now_add=True,
)
comment = models.CharField(
_('comment'),
max_length = 512,
blank = True, null = True,
)
related_type = models.ForeignKey(
ContentType,
blank = True, null = True,
)
related_id = models.PositiveIntegerField(
blank = True, null = True,
)
related = GenericForeignKey(
'related_type', 'related_id',
)
@property
def end(self):
"""
Calculated end using self.related informations
"""
if self.related_type == programs.Diffusion:
return self.related.end
if self.related_type == programs.Sound:
return self.date + to_timedelta(self.duration)
return self.date
def is_expired(self, date = None):
"""
Return True if the log is expired. Note that it only check
against the date, so it is still possible that the expiration
occured because of a Stop or other source.
"""
date = programs.date_or_default(date)
return self.end < date
@classmethod
def get_for(cl, object = None, model = None):
"""
Return a queryset that filter on the related object. If object is
given, filter using it, otherwise only using model.
If model is not given, uses object's type.
"""
if not model and object:
model = type(object)
qs = cl.objects.filter(related_type__pk =
ContentTYpe.objects.get_for_model(model).id)
if object:
qs = qs.filter(related_id = object.pk)
return qs
def print(self):
logger.info('log #%s: %s%s',
str(self),
self.comment or '',
' -- {} #{}'.format(self.related_type, self.related_id)
if self.related_object else ''
)
def __str__(self):
return '#{} ({}, {})'.format(
self.id, self.date.strftime('%Y/%m/%d %H:%M'), self.source.name
)

144
controllers/monitor.py Normal file
View File

@ -0,0 +1,144 @@
from django.utils import timezone as tz
import aircox.programs.models as programs
from aircox.controller.models import Log
class Monitor:
"""
Log and launch diffusions for the given station.
Monitor should be able to be used after a crash a go back
where it was playing, so we heavily use logs to be able to
do that.
"""
station = None
@staticmethod
def log(**kwargs):
"""
Create a log using **kwargs, and print info
"""
log = programs.Log(station = self.station, **kwargs)
log.save()
log.print()
def track(self):
"""
Check the current_sound of the station and update logs if
needed
"""
station = self.station
station.controller.fetch()
current_sound = station.controller.current_sound
current_source = station.controller.current_source
if not current_sound:
return
log = Log.get_for(model = programs.Sound) \
.filter(station = station).order_by('date').last()
# TODO: expiration
if log and (log.source == current_source and \
log.related.path == current_sound):
return
sound = programs.Sound.object.filter(path = current_sound)
self.log(
type = Log.Type.play,
source = current_source,
date = tz.make_aware(tz.datetime.now()),
related = sound[0] if sound else None,
comment = None if sound else current_sound,
)
def __current_diff(self):
"""
Return a tuple with the currently running diffusion and the items
that still have to be played. If there is not, return None
"""
station = self.station
now = tz.make_aware(tz.datetime.now())
sound_log = Log.get_for(model = programs.Sound) \
.filter(station = station).order_by('date').last()
diff_log = Log.get_for(model = programs.Diffusion) \
.filter(station = station).order_by('date').last()
if not sound_log or not diff_log or \
sound_log.source != diff_log.source or \
diff_log.related.is_date_in_my_range(now) :
return None, []
# last registered diff is still playing: update the playlist
sounds = Log.get_for(model = programs.Sound) \
.filter(station = station, source = diff_log.source) \
.filter(pk__gt = diff.log.pk)
sounds = [ sound.path for sound in sounds if not sound.removed ]
return (
diff_log.related,
[ path for path in diff_log.related.playlist
if path not in sounds ]
)
def __next_diff(self, diff):
"""
Return the tuple with the next diff that should be played and
the playlist
"""
station = self.station
now = tz.make_aware(tz.datetime.now())
args = {'start__gt': diff.start } if diff else {}
diff = programs.Diffusion.get(
now, now = True,
type = programs.Diffusion.Type.normal,
sound__type = programs.Sound.Type.archive,
sound__removed = False,
**args
).distinct().order_by('start').first()
return (diff, diff and diff.playlist or [])
def handle(self):
"""
Handle scheduled diffusion, trigger if needed, preload playlists
and so on.
"""
station = self.station
dealer = station.dealer
if not dealer:
return
now = tz.make_aware(tz.datetime.now())
# current and next diffs
diff, playlist = self.__current_diff()
dealer.on = bool(playlist)
next_diff, next_playlist = self.__next_diff()
playlist += next_playlist
# playlist update
if dealer.playlist != playlist:
dealer.playlist = playlist
if next_diff:
self.log(
type = Log.Type.load,
source = dealer.id,
date = now,
related_object = next_diff
)
# dealer.on when next_diff start <= now
if next_diff and not dealer.on and next_diff.start <= now:
dealer.on = True
for source in station.get_sources():
source.controller.skip()
cl.log(
type = Log.Type.play,
source = dealer.id,
date = now,
related_object = next_diff,
)

View File

@ -0,0 +1,91 @@
import os
import socket
import re
import json
class Connector:
"""
Simple connector class that retrieve/send data through a unix
domain socket file or a TCP/IP connection
It is able to parse list of `key=value`, and JSON data.
"""
__socket = None
__available = False
address = None
"""
a string to the unix domain socket file, or a tuple (host, port) for
TCP/IP connection
"""
@property
def available(self):
return self.__available
def __init__(self, address = None):
if address:
self.address = address
def open(self):
if self.__available:
return
try:
family = socket.AF_INET if type(self.address) in (tuple, list) else \
socket.AF_UNIX
self.__socket = socket.socket(family, socket.SOCK_STREAM)
self.__socket.connect(self.address)
self.__available = True
except:
self.__available = False
return -1
def send(self, *data, try_count = 1, parse = False, parse_json = False):
if self.open():
return ''
data = bytes(''.join([str(d) for d in data]) + '\n', encoding='utf-8')
try:
reg = re.compile(r'(.*)\s+END\s*$')
self.__socket.sendall(data)
data = ''
while not reg.search(data):
data += self.__socket.recv(1024).decode('utf-8')
if data:
data = reg.sub(r'\1', data)
data = data.strip()
if parse:
data = self.parse(data)
elif parse_json:
data = self.parse_json(data)
return data
except:
self.__available = False
if try_count > 0:
return self.send(data, try_count - 1)
def parse(self, string):
string = string.split('\n')
data = {}
for line in string:
line = re.search(r'(?P<key>[^=]+)="?(?P<value>([^"]|\\")+)"?', line)
if not line:
continue
line = line.groupdict()
data[line['key']] = line['value']
return data
def parse_json(self, string):
try:
if string[0] == '"' and string[-1] == '"':
string = string[1:-1]
return json.loads(string) if string else None
except:
return None

View File

@ -0,0 +1,105 @@
import os
import aircox.controllers.plugins.plugins as plugins
from aircox.controllers.plugins.connector import Connector
class LiquidSoap(plugins.Plugin):
@staticmethod
def init_station(station):
return StationController(station = station)
@staticmethod
def init_source(source):
return SourceController(source = source)
class StationController(plugins.StationController):
template_name = 'aircox/controllers/liquidsoap.liq'
socket_path = ''
connector = None
def __init__(self, station, **kwargs):
super().__init__(
station = station,
path = os.path.join(station.path, 'station.liq'),
socket_path = os.path.join(station.path, 'station.sock'),
**kwargs
)
self.connector = Connector(self.socket_path)
def _send(self, *args, **kwargs):
self.connector.send(*args, **kwargs)
def fetch(self):
super().fetch()
data = self._send('request.on_air')
if not data:
return
data = self._send('request.metadata', data, parse = True)
if not data:
return
self.current_sound = data.get('initial_uri')
# FIXME: point to the Source object
self.current_source = data.get('source')
class SourceController(plugins.SourceController):
connector = None
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self.connector = self.source.station.controller.connector
def _send(self, *args, **kwargs):
self.connector.send(*args, **kwargs)
@property
def active(self):
return self._send('var.get ', self.source.slug, '_active') == 'true'
@active.setter
def active(self, value):
return self._send('var.set ', self.source.slug, '_active', '=',
'true' if value else 'false')
def skip(self):
"""
Skip a given source. If no source, use master.
"""
self._send(self.source.slug, '.skip')
def fetch(self):
data = self._send(self.source.slug, '.get', parse = True)
if not data:
return
# FIXME: still usefull? originally tested only if there ass self.program
source = data.get('source') or ''
if not source.startswith(self.id):
return
self.current_sound = data.get('initial_uri')
def stream(self):
"""
Return a dict with stream info for a Stream program, or None if there
is not. Used in the template.
"""
stream = self.source.stream
if not stream or (not stream.begin and not stream.delay):
return
def to_seconds(time):
return 3600 * time.hour + 60 * time.minute + time.second
return {
'begin': stream.begin.strftime('%Hh%M') if stream.begin else None,
'end': stream.end.strftime('%Hh%M') if stream.end else None,
'delay': to_seconds(stream.delay) if stream.delay else 0
}

View File

@ -0,0 +1,198 @@
import os
import re
from django.template.loader import render_to_string
class Plugins(type):
registry = {}
def __new__(cls, name, bases, attrs):
cl = super().__new__(cls, name, bases, attrs)
if name != 'Plugin':
if not cl.name:
cl.name = name.lower()
cls.registry[cl.name] = cl
return cl
@classmethod
def discover(cls):
"""
Discover plugins -- needed because of the import traps
"""
import aircox.controllers.plugins.liquidsoap
class Plugin(metaclass=Plugins):
name = ''
def init_station(self, station):
pass
def init_source(self, source):
pass
class StationController:
"""
Controller of a Station.
"""
station = None
"""
Related station
"""
template_name = ''
"""
If set, use this template in order to generated the configuration
file in self.path file
"""
path = None
"""
Path of the configuration file.
"""
current_sound = ''
"""
Current sound being played (retrieved by fetch)
"""
@property
def id(self):
return '{station.slug}_{station.pk}'.format(station = self.station)
# TODO: add function to launch external program?
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def fetch(self):
"""
Fetch data of the children and so on
The base function just execute the function of all children
sources. The plugin must implement the other extra part
"""
sources = self.station.get_sources()
for source in sources:
if source.controller:
source.controller.fetch()
def push(self, config = True):
"""
Update configuration and children's info.
The base function just execute the function of all children
sources. The plugin must implement the other extra part
"""
sources = self.station.get_sources()
for source in sources:
source.prepare()
if source.controller:
source.controller.push()
if config and self.path and self.template_name:
import aircox.controllers.settings as settings
data = render_to_string(self.template_name, {
'station': self.station,
'settings': settings,
})
data = re.sub('[\t ]+\n', '\n', data)
data = re.sub('\n{3,}', '\n\n', data)
os.makedirs(os.path.dirname(self.path), exist_ok = True)
with open(self.path, 'w+') as file:
file.write(data)
def skip(self):
"""
Skip the current sound on the station
"""
pass
class SourceController:
"""
Controller of a Source. Value are usually updated directly on the
external side.
"""
source = None
"""
Related source
"""
path = ''
"""
Path to the Source's playlist file. Optional.
"""
active = True
"""
Source is available. May be different from the containing Source,
e.g. dealer and liquidsoap.
"""
current_sound = ''
"""
Current sound being played (retrieved by fetch)
"""
current_source = None
"""
Current source being responsible of the current sound
"""
@property
def id(self):
return '{source.station.slug}_{source.slug}'.format(source = self.source)
__playlist = None
@property
def playlist(self):
"""
Current playlist on the Source, list of paths to play
"""
return self.__playlist
@playlist.setter
def playlist(self, value):
self.__playlist = value
self.push()
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
self.__playlist = []
if not self.path:
self.path = os.path.join(self.source.station.path,
self.source.slug + '.m3u')
def skip(self):
"""
Skip the current sound in the source
"""
pass
def fetch(self):
"""
Get the source information
"""
pass
def push(self):
"""
Update data relative to the source on the external program.
By default write the playlist.
"""
os.makedirs(os.path.dirname(self.path), exist_ok = True)
with open(self.path, 'w') as file:
file.write('\n'.join(self.playlist or []))
def activate(self, value = True):
"""
Activate/Deactivate current source. May be different from the
containing Source.
"""
pass
class Monitor:
station = None

13
controllers/settings.py Executable file
View File

@ -0,0 +1,13 @@
import os
import stat
from django.conf import settings
def ensure (key, default):
globals()[key] = getattr(settings, key, default)
# Working directory for the controllers
ensure('AIRCOX_CONTROLLERS_MEDIA', '/tmp/aircox')

View File

@ -340,16 +340,9 @@ class Controller:
if not config:
return
log_script = main_settings.BASE_DIR \
if hasattr(main_settings, 'BASE_DIR') else \
main_settings.PROJECT_ROOT
log_script = os.path.join(log_script, 'manage.py') + \
' liquidsoap_log'
context = {
'controller': self,
'settings': settings,
'log_script': log_script,
}
# FIXME: remove this crappy thing

View File

@ -26,6 +26,7 @@
- admin cms
-> sections/actions and django decorator?
-> enhance calendar with possible actions?
- sections id generator
- website:
- diffusions:
@ -38,6 +39,7 @@
- list of played diffusions and tracks when non-stop;
# Long term TODO
- automatic cancel of passed diffusion based on logs
- sounds monitor: max_size of path, take in account
- logs: archive functionnality
- track stats for diffusions
@ -46,6 +48,7 @@
- view as grid
- actions -> noscript case, think of accessibility
- comments -> remove/edit by the author
- integrate logs for tracks + in on air

View File

@ -147,12 +147,6 @@ class DiffusionAdmin(admin.ModelAdmin):
return qs.exclude(type = Diffusion.Type.unconfirmed)
@admin.register(Log)
class LogAdmin(admin.ModelAdmin):
list_display = ['id', 'date', 'source', 'comment', 'related_object']
list_filter = ['date', 'source', 'related_type']
@admin.register(Schedule)
class ScheduleAdmin(admin.ModelAdmin):
def program_name(self, obj):

View File

@ -713,77 +713,3 @@ class Diffusion(models.Model):
('programming', _('edit the diffusion\'s planification')),
)
class Log(models.Model):
"""
Log sounds and diffusions that are played in the streamer. It
can also be used for other purposes.
"""
class Type(IntEnum):
stop = 0x00
"""
Source has been stopped (only when there is no more sound)
"""
play = 0x01
"""
Source has been started/changed and is running related_object
If no related_object is available, comment is used to designate
the sound.
"""
load = 0x02
"""
Source starts to be preload related_object
"""
type = models.SmallIntegerField(
verbose_name = _('type'),
choices = [ (int(y), _(x)) for x,y in Type.__members__.items() ],
blank = True, null = True,
)
source = models.CharField(
_('source'),
max_length = 64,
help_text = 'source information',
blank = True, null = True,
)
date = models.DateTimeField(
_('date'),
auto_now_add=True,
)
comment = models.CharField(
_('comment'),
max_length = 512,
blank = True, null = True,
)
related_type = models.ForeignKey(
ContentType,
blank = True, null = True,
)
related_id = models.PositiveIntegerField(
blank = True, null = True,
)
related_object = GenericForeignKey(
'related_type', 'related_id',
)
@classmethod
def get_for_related_model(cl, model):
"""
Return a queryset that filter related_type to the given one.
"""
return cl.objects.filter(related_type__pk =
ContentType.objects.get_for_model(model).id)
def print(self):
logger.info('log #%s: %s%s',
str(self),
self.comment or '',
' -- {} #{}'.format(self.related_type, self.related_id)
if self.related_object else ''
)
def __str__(self):
return '#{} ({}, {})'.format(
self.id, self.date.strftime('%Y-%m-%d %H:%M'), self.source
)