add missing files + move Monitor class from commands to controllers.

This commit is contained in:
bkfox 2023-06-17 23:57:58 +02:00
parent a7f39c3628
commit 95f5dca683
4 changed files with 345 additions and 273 deletions

View File

@ -5,6 +5,7 @@ from .metadata import Metadata, Request
from .streamer import Streamer
from .streamers import Streamers
from .sources import Source, PlaylistSource, QueueSource
from .monitor import Monitor
streamers = Streamers()
@ -19,4 +20,6 @@ __all__ = (
"Source",
"PlaylistSource",
"QueueSource",
"Monitor",
"streamers",
)

View File

@ -0,0 +1,278 @@
# TODO:
# x controllers: remaining
# x diffusion conflicts
# x cancel
# x when liquidsoap fails to start/exists: exit
# - handle restart after failure
# - is stream restart after live ok?
import pytz
from django.utils import timezone as tz
from aircox.models import Diffusion, Log, Sound, Track
# force using UTC
tz.activate(pytz.UTC)
class Monitor:
"""Log and launch diffusions for the given station.
Monitor should be able to be used after a crash a go back
where it was playing, so we heavily use logs to be able to
do that.
We keep trace of played items on the generated stream:
- sounds played on this stream;
- scheduled diffusions
- tracks for sounds of streamed programs
"""
streamer = None
"""Streamer controller."""
delay = None
""" Timedelta: minimal delay between two call of monitor. """
logs = None
"""Queryset to station's logs (ordered by -pk)"""
cancel_timeout = 20
"""Timeout in minutes before cancelling a diffusion."""
sync_timeout = 5
"""Timeout in minutes between two streamer's sync."""
sync_next = None
"""Datetime of the next sync."""
last_sound_logs = None
"""Last logged sounds, as ``{source_id: log}``."""
@property
def station(self):
return self.streamer.station
@property
def last_log(self):
"""Last log of monitored station."""
return self.logs.first()
@property
def last_diff_start(self):
"""Log of last triggered item (sound or diffusion)."""
return self.logs.start().with_diff().first()
def __init__(self, streamer, delay, cancel_timeout, **kwargs):
self.streamer = streamer
# adding time ensure all calculation have a margin
self.delay = delay + tz.timedelta(seconds=5)
self.cancel_timeout = cancel_timeout
self.__dict__.update(kwargs)
self.logs = self.get_logs_queryset()
self.init_last_sound_logs()
def get_logs_queryset(self):
"""Return queryset to assign as `self.logs`"""
return self.station.log_set.select_related(
"diffusion", "sound", "track"
).order_by("-pk")
def init_last_sound_logs(self, key=None):
"""Retrieve last logs and initialize `last_sound_logs`"""
logs = {}
for source in self.streamer.sources:
qs = self.logs.filter(source=source.id, sound__isnull=False)
logs[source.id] = qs.first()
self.last_sound_logs = logs
def monitor(self):
"""Run all monitoring functions once."""
if not self.streamer.is_ready:
return
self.streamer.fetch()
# Skip tracing - analyzis:
# Reason: multiple database request every x seconds, reducing it.
# We could skip this part when remaining time is higher than a minimal
# value (which should be derived from Command's delay). Problems:
# - How to trace tracks? (+ Source can change: caching log might sucks)
# - if liquidsoap's source/request changes: remaining time goes higher,
# thus avoiding fetch
#
# Approach: something like having a mean time, such as:
#
# ```
# source = stream.source
# mean_time = source.air_time
# + min(next_track.timestamp, source.remaining)
# - (command.delay + 1)
# trace_required = \/ source' != source
# \/ source.uri' != source.uri
# \/ now < mean_time
# ```
#
source = self.streamer.source
if source and source.uri:
log = self.trace_sound(source)
if log:
self.trace_tracks(log)
else:
print("no source or sound for stream; source = ", source)
self.handle_diffusions()
self.sync()
def log(self, source, **kwargs):
"""Create a log using **kwargs, and print info."""
kwargs.setdefault("station", self.station)
kwargs.setdefault("date", tz.now())
log = Log(source=source, **kwargs)
log.save()
log.print()
if log.sound:
self.last_sound_logs[source] = log
return log
def trace_sound(self, source):
"""Return on air sound log (create if not present)."""
air_uri, air_time = source.uri, source.air_time
last_log = self.last_sound_logs.get(source.id)
if last_log and last_log.sound.file.path == source.uri:
return last_log
# FIXME: can be a sound played when no Sound instance? If not, remove
# comment.
# check if there is yet a log for this sound on the source
# log = self.logs.on_air().filter(
# Q(sound__file=air_uri) |
# # sound can be null when arbitrary sound file is played
# Q(sound__isnull=True, track__isnull=True, comment=air_uri),
# source=source.id,
# date__range=date_range(air_time, self.delay),
# ).first()
# if log:
# return log
# get sound
diff = None
sound = Sound.objects.path(air_uri).first()
if sound and sound.episode_id is not None:
diff = (
Diffusion.objects.episode(id=sound.episode_id)
.on_air()
.now(air_time)
.first()
)
# log sound on air
return self.log(
type=Log.TYPE_ON_AIR,
date=source.air_time,
source=source.id,
sound=sound,
diffusion=diff,
comment=air_uri,
)
def trace_tracks(self, log):
"""Log tracks for the given sound log (for streamed programs only)."""
if log.diffusion:
return
tracks = Track.objects.filter(
sound__id=log.sound_id, timestamp__isnull=False
).order_by("timestamp")
if not tracks.exists():
return
# exclude already logged tracks
tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk)
now = tz.now()
for track in tracks:
pos = log.date + tz.timedelta(seconds=track.timestamp)
if pos > now:
break
self.log(
type=Log.TYPE_ON_AIR,
date=pos,
source=log.source,
track=track,
comment=track,
)
def handle_diffusions(self):
"""Handle scheduled diffusion, trigger if needed, preload playlists and
so on."""
# TODO: program restart
# Diffusion conflicts are handled by the way a diffusion is defined
# as candidate for the next dealer's start.
#
# ```
# logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START
# /\ log.diff = diff
# /\ log.date = diff.start
# queue_empty: /\ dealer.queue is empty
# /\ \/ ~dealer.on_air
# \/ dealer.remaining < delay
#
# start_allowed: /\ diff not in logged_diff
# /\ queue_empty
#
# start_canceled: /\ diff not in logged diff
# /\ ~queue_empty
# /\ diff.start < now + cancel_timeout
# ```
#
now = tz.now()
diff = (
Diffusion.objects.station(self.station)
.on_air()
.now(now)
.filter(episode__sound__type=Sound.TYPE_ARCHIVE)
.first()
)
# Can't use delay: diffusion may start later than its assigned start.
log = None if not diff else self.logs.start().filter(diffusion=diff)
if not diff or log:
return
dealer = self.streamer.dealer
# start
if (
not dealer.queue
and dealer.rid is None
or dealer.remaining < self.delay.total_seconds()
):
self.start_diff(dealer, diff)
# cancel
if diff.start < now - self.cancel_timeout:
self.cancel_diff(dealer, diff)
def start_diff(self, source, diff):
playlist = Sound.objects.episode(id=diff.episode_id).playlist()
source.push(*playlist)
self.log(
type=Log.TYPE_START,
source=source.id,
diffusion=diff,
comment=str(diff),
)
def cancel_diff(self, source, diff):
diff.type = Diffusion.TYPE_CANCEL
diff.save()
self.log(
type=Log.TYPE_CANCEL,
source=source.id,
diffusion=diff,
comment=str(diff),
)
def sync(self):
"""Update sources' playlists."""
now = tz.now()
if self.sync_next is not None and now < self.sync_next:
return
self.sync_next = now + tz.timedelta(minutes=self.sync_timeout)
for source in self.streamer.playlists:
source.sync()

View File

@ -0,0 +1,62 @@
from django.utils import timezone as tz
from aircox.models import Station
from .streamer import Streamer
__all__ = ("Streamers",)
class Streamers:
"""Keep multiple streamers in memory, allow fetching informations."""
streamers = None
"""Stations by station id."""
streamer_class = Streamer
timeout = None
"""Timedelta to next update."""
next_date = None
"""Next update datetime."""
def __init__(self, timeout=None, streamer_class=streamer_class):
self.timeout = timeout or tz.timedelta(seconds=2)
def reset(self, stations=Station.objects.active()):
# FIXME: cf. TODO in aircox.controllers about model updates
stations = stations.all()
self.streamers = {
station.pk: self.streamer_class(station) for station in stations
}
def fetch(self):
"""Call streamers fetch if timed-out."""
if self.streamers is None:
self.reset()
now = tz.now()
if self.next_date is not None and now < self.next_date:
return
for streamer in self.streamers.values():
streamer.fetch()
self.next_date = now + self.timeout
def get(self, key, default=None):
return self.streamers.get(key, default)
def values(self):
return self.streamers.values()
def __len__(self):
return self.streamers and len(self.streamers) or 0
def __getitem__(self, key):
return self.streamers[key]
def __contains__(self, key):
"""Key can be a Station or a Station id."""
if isinstance(key, Station):
return key.pk in self.streamers
return key in self.streamers
def __iter__(self):
return self.streamers.values() if self.streamers else iter(tuple())

View File

@ -8,290 +8,19 @@ to:
"""
import time
# TODO:
# x controllers: remaining
# x diffusion conflicts
# x cancel
# x when liquidsoap fails to start/exists: exit
# - handle restart after failure
# - is stream restart after live ok?
from argparse import RawTextHelpFormatter
import pytz
from django.core.management.base import BaseCommand
from django.utils import timezone as tz
from aircox.models import Diffusion, Log, Sound, Station, Track
from aircox_streamer.controllers import Streamer
from aircox.models import Station
from aircox_streamer.controllers import Monitor, Streamer
# force using UTC
tz.activate(pytz.UTC)
class Monitor:
"""Log and launch diffusions for the given station.
Monitor should be able to be used after a crash a go back
where it was playing, so we heavily use logs to be able to
do that.
We keep trace of played items on the generated stream:
- sounds played on this stream;
- scheduled diffusions
- tracks for sounds of streamed programs
"""
streamer = None
"""Streamer controller."""
delay = None
""" Timedelta: minimal delay between two call of monitor. """
logs = None
"""Queryset to station's logs (ordered by -pk)"""
cancel_timeout = 20
"""Timeout in minutes before cancelling a diffusion."""
sync_timeout = 5
"""Timeout in minutes between two streamer's sync."""
sync_next = None
"""Datetime of the next sync."""
last_sound_logs = None
"""Last logged sounds, as ``{source_id: log}``."""
@property
def station(self):
return self.streamer.station
@property
def last_log(self):
"""Last log of monitored station."""
return self.logs.first()
@property
def last_diff_start(self):
"""Log of last triggered item (sound or diffusion)."""
return self.logs.start().with_diff().first()
def __init__(self, streamer, delay, cancel_timeout, **kwargs):
self.streamer = streamer
# adding time ensure all calculation have a margin
self.delay = delay + tz.timedelta(seconds=5)
self.cancel_timeout = cancel_timeout
self.__dict__.update(kwargs)
self.logs = self.get_logs_queryset()
self.init_last_sound_logs()
def get_logs_queryset(self):
"""Return queryset to assign as `self.logs`"""
return self.station.log_set.select_related(
"diffusion", "sound", "track"
).order_by("-pk")
def init_last_sound_logs(self, key=None):
"""Retrieve last logs and initialize `last_sound_logs`"""
logs = {}
for source in self.streamer.sources:
qs = self.logs.filter(source=source.id, sound__isnull=False)
logs[source.id] = qs.first()
self.last_sound_logs = logs
def monitor(self):
"""Run all monitoring functions once."""
if not self.streamer.is_ready:
return
self.streamer.fetch()
# Skip tracing - analyzis:
# Reason: multiple database request every x seconds, reducing it.
# We could skip this part when remaining time is higher than a minimal
# value (which should be derived from Command's delay). Problems:
# - How to trace tracks? (+ Source can change: caching log might sucks)
# - if liquidsoap's source/request changes: remaining time goes higher,
# thus avoiding fetch
#
# Approach: something like having a mean time, such as:
#
# ```
# source = stream.source
# mean_time = source.air_time
# + min(next_track.timestamp, source.remaining)
# - (command.delay + 1)
# trace_required = \/ source' != source
# \/ source.uri' != source.uri
# \/ now < mean_time
# ```
#
source = self.streamer.source
if source and source.uri:
log = self.trace_sound(source)
if log:
self.trace_tracks(log)
else:
print("no source or sound for stream; source = ", source)
self.handle_diffusions()
self.sync()
def log(self, source, **kwargs):
"""Create a log using **kwargs, and print info."""
kwargs.setdefault("station", self.station)
kwargs.setdefault("date", tz.now())
log = Log(source=source, **kwargs)
log.save()
log.print()
if log.sound:
self.last_sound_logs[source] = log
return log
def trace_sound(self, source):
"""Return on air sound log (create if not present)."""
air_uri, air_time = source.uri, source.air_time
last_log = self.last_sound_logs.get(source.id)
if last_log and last_log.sound.file.path == source.uri:
return last_log
# FIXME: can be a sound played when no Sound instance? If not, remove
# comment.
# check if there is yet a log for this sound on the source
# log = self.logs.on_air().filter(
# Q(sound__file=air_uri) |
# # sound can be null when arbitrary sound file is played
# Q(sound__isnull=True, track__isnull=True, comment=air_uri),
# source=source.id,
# date__range=date_range(air_time, self.delay),
# ).first()
# if log:
# return log
# get sound
diff = None
sound = Sound.objects.path(air_uri).first()
if sound and sound.episode_id is not None:
diff = (
Diffusion.objects.episode(id=sound.episode_id)
.on_air()
.now(air_time)
.first()
)
# log sound on air
return self.log(
type=Log.TYPE_ON_AIR,
date=source.air_time,
source=source.id,
sound=sound,
diffusion=diff,
comment=air_uri,
)
def trace_tracks(self, log):
"""Log tracks for the given sound log (for streamed programs only)."""
if log.diffusion:
return
tracks = Track.objects.filter(
sound__id=log.sound_id, timestamp__isnull=False
).order_by("timestamp")
if not tracks.exists():
return
# exclude already logged tracks
tracks = tracks.exclude(log__station=self.station, log__pk__gt=log.pk)
now = tz.now()
for track in tracks:
pos = log.date + tz.timedelta(seconds=track.timestamp)
if pos > now:
break
self.log(
type=Log.TYPE_ON_AIR,
date=pos,
source=log.source,
track=track,
comment=track,
)
def handle_diffusions(self):
"""Handle scheduled diffusion, trigger if needed, preload playlists and
so on."""
# TODO: program restart
# Diffusion conflicts are handled by the way a diffusion is defined
# as candidate for the next dealer's start.
#
# ```
# logged_diff: /\ \A diff in diffs: \E log: /\ log.type = START
# /\ log.diff = diff
# /\ log.date = diff.start
# queue_empty: /\ dealer.queue is empty
# /\ \/ ~dealer.on_air
# \/ dealer.remaining < delay
#
# start_allowed: /\ diff not in logged_diff
# /\ queue_empty
#
# start_canceled: /\ diff not in logged diff
# /\ ~queue_empty
# /\ diff.start < now + cancel_timeout
# ```
#
now = tz.now()
diff = (
Diffusion.objects.station(self.station)
.on_air()
.now(now)
.filter(episode__sound__type=Sound.TYPE_ARCHIVE)
.first()
)
# Can't use delay: diffusion may start later than its assigned start.
log = None if not diff else self.logs.start().filter(diffusion=diff)
if not diff or log:
return
dealer = self.streamer.dealer
# start
if (
not dealer.queue
and dealer.rid is None
or dealer.remaining < self.delay.total_seconds()
):
self.start_diff(dealer, diff)
# cancel
if diff.start < now - self.cancel_timeout:
self.cancel_diff(dealer, diff)
def start_diff(self, source, diff):
playlist = Sound.objects.episode(id=diff.episode_id).playlist()
source.push(*playlist)
self.log(
type=Log.TYPE_START,
source=source.id,
diffusion=diff,
comment=str(diff),
)
def cancel_diff(self, source, diff):
diff.type = Diffusion.TYPE_CANCEL
diff.save()
self.log(
type=Log.TYPE_CANCEL,
source=source.id,
diffusion=diff,
comment=str(diff),
)
def sync(self):
"""Update sources' playlists."""
now = tz.now()
if self.sync_next is not None and now < self.sync_next:
return
self.sync_next = now + tz.timedelta(minutes=self.sync_timeout)
for source in self.streamer.playlists:
source.sync()
class Command(BaseCommand):
help = __doc__