From e162b652b84f12b2378580542fcc2320eea3d733 Mon Sep 17 00:00:00 2001 From: bkfox Date: Sun, 16 Jul 2017 14:30:46 +0200 Subject: [PATCH] work on streamer, change a bit how it works; make archiving --- aircox/management/commands/streamer.py | 128 ++++++++++++++++--------- aircox/models.py | 121 ++++++++++++++++++++--- 2 files changed, 192 insertions(+), 57 deletions(-) diff --git a/aircox/management/commands/streamer.py b/aircox/management/commands/streamer.py index 97ec097..afbcdaa 100755 --- a/aircox/management/commands/streamer.py +++ b/aircox/management/commands/streamer.py @@ -14,6 +14,7 @@ from argparse import RawTextHelpFormatter from django.conf import settings as main_settings from django.core.management.base import BaseCommand, CommandError from django.utils import timezone as tz +from django.utils.functional import cached_property from aircox.models import Station, Diffusion, Track, Sound, Log #, DiffusionLog, SoundLog @@ -54,17 +55,41 @@ class Monitor: Datetime of the next sync """ - _last_log = None - """ - Last emitted log - """ + + def _last_log(self, **kwargs): + return Log.objects.station(self.station, **kwargs) \ + .select_related('diffusion', 'sound') \ + .order_by('date').last() + + @property + def last_log(self): + """ + Last log of monitored station + """ + return self._last_log() + + @property + def last_sound(self): + """ + Last sound log of monitored station that occurred on_air + """ + return self._last_log(type = Log.Type.on_air, + sound__isnull = False) + + @property + def last_diff_start(self): + """ + Log of last triggered item (sound or diffusion) + """ + return self._last_log(type = Log.Type.start, + diffusion__isnull = False) + def __init__(self, station, **kwargs): self.station = station self.__dict__.update(kwargs) - self._last_log = Log.objects.station(station).order_by('date') \ - .last() + now = tz.now() def monitor(self): """ @@ -91,9 +116,9 @@ class Monitor: # update last log if log.type != Log.Type.other and \ - self._last_log and not self._last_log.end: - self._last_log.end = log.date - self._last_log = log + self.last_log and not self.last_log.end: + self.last_log.end = log.date + return log def trace(self): """ @@ -106,51 +131,59 @@ class Monitor: if not current_sound or not current_source: return - log = Log.objects.station(self.station, sound__isnull = False) \ - .select_related('sound') \ - .order_by('date').last() + # last log can be anything, so we need to keep track of the last + # sound log too + # sound on air can be of a diffusion or a stream. - # only streamed ns - if log and not log.sound.diffusion: + log = self.last_log + + # sound on air changed + if log.source != current_source.id or \ + (log.sound and log.sound.path != current_sound): + sound = Sound.objects.filter(path = current_sound).first() + + # find diff + last_diff = self.last_diff_start + diff = None + if not last_diff.is_expired(): + archives = last_diff.diffusion.get_archives() + if archives.filter(pk = sound.pk).exists(): + diff = last_diff.diffusion + + # log sound on air + log = self.log( + type = Log.Type.on_air, + source = current_source.id, + date = tz.now(), + sound = sound, + diffusion = diff, + # keep sound path (if sound is removed, we keep that info) + comment = current_sound, + ) + + # tracks -- only for sound's + if not log.diffusion: self.trace_sound_tracks(log) - # TODO: expiration - if log and (log.source == current_source.id and \ - log.sound and - log.sound.path == current_sound): - return - - sound = Sound.objects.filter(path = current_sound).first() - self.log( - type = Log.Type.on_air, - source = current_source.id, - date = tz.now(), - sound = sound, - # keep sound path (if sound is removed, we keep that info) - comment = current_sound, - ) def trace_sound_tracks(self, log): """ - Log tracks for the given sound (for streamed programs); Called by - self.trace + Log tracks for the given sound log (for streamed programs). + Called by self.trace """ - logs = Log.objects.station(self.station, - track__isnull = False, - pk__gt = log.pk) \ - .values_list('sound__pk', flat = True) - tracks = Track.objects.get_for(object = log.sound) \ .filter(in_seconds = True) - if tracks and len(tracks) == len(logs): + if not tracks.exists(): return - tracks = tracks.exclude(pk__in = logs).order_by('position') + tracks = tracks.exclude(log__station = self.station, + log__pk__gt = log.pk) now = tz.now() for track in tracks: pos = log.date + tz.timedelta(seconds = track.position) if pos > now: break + # log track on air self.log( type = Log.Type.on_air, source = log.source, date = pos, track = track, @@ -195,6 +228,7 @@ class Monitor: if diff.start < now: diff.type = Diffusion.Type.canceled diff.save() + # log canceled diffusion self.log( type = Log.Type.other, diffusion = diff, @@ -254,13 +288,17 @@ class Monitor: """ Update playlist of a source if required, and handle logging when it is needed. + + - source: source on which it happens + - playlist: list of sounds to use to update + - diff: related diffusion """ - dealer = self.station.dealer - if dealer.playlist == playlist: + if source.playlist == playlist: return - dealer.playlist = playlist + source.playlist = playlist if diff and not diff.is_live(): + # log diffusion archive load self.log(type = Log.Type.load, source = source.id, diffusion = diff, @@ -284,6 +322,7 @@ class Monitor: diff_ = Log.objects.station(self.station) \ .filter(diffusion = diff, type = Log.Type.on_air) if not diff_.count(): + # log live diffusion self.log(type = Log.Type.on_air, source = source.id, diffusion = diff, date = date) return @@ -291,8 +330,11 @@ class Monitor: # enable dealer if not source.active: source.active = True - self.log(type = Log.Type.play, source = source.id, - diffusion = diff, date = date) + last_start = self.last_start + if last_start.diffusion_id != diff.pk: + # log triggered diffusion + self.log(type = Log.Type.start, source = source.id, + diffusion = diff, date = date) def handle(self): """ diff --git a/aircox/models.py b/aircox/models.py index 3438ca9..ae04f08 100755 --- a/aircox/models.py +++ b/aircox/models.py @@ -141,7 +141,7 @@ class Track(Related): ) def __str__(self): - return '{self.artist} -- {self.title}'.format(self=self) + return '{self.artist} -- {self.title} -- {self.position}'.format(self=self) class Meta: verbose_name = _('Track') @@ -245,6 +245,10 @@ class Station(Nameable): If date is not specified, count MUST be set to a non-zero value. Be careful with what you which for: the result is a plain list. + + It is different from Station.played method since it filters out + elements that should have not been on air, such as a stream that + has been played when there was a live diffusion. """ # FIXME: as an iterator? # TODO argument to get sound instead of tracks @@ -1209,7 +1213,8 @@ class LogManager(models.Manager): qs = self._at(date, qs, **kwargs) return self.station(station, qs) if station else qs - def played(self, station, archives = True, **kwargs): + # TODO: rename on_air + rename Station.on_air into sth like regular_on_air + def played(self, station, archives = True, date = None, **kwargs): """ Return a queryset of the played elements' log for the given station and model. This queryset is ordered by date ascending @@ -1217,11 +1222,18 @@ class LogManager(models.Manager): * station: related station * archives: if false, exclude log of diffusion's archives from the queryset; + * date: get played logs at the given date only * include_live: include diffusion that have no archive * kwargs: extra filter kwargs """ - qs = self.filter(type__in = (Log.Type.play, Log.Type.on_air), - **kwargs) + if date: + qs = self.at(station, date) + else: + qs = self + + qs = qs.filter( + type__in = (Log.Type.start, Log.Type.on_air), **kwargs + ) if not archives and station.dealer: qs = qs.exclude( @@ -1230,6 +1242,73 @@ class LogManager(models.Manager): ) return qs.order_by('date') + @staticmethod + def _get_archive_path(station, date): + # note: station name is not included in order to avoid problems + # of retrieving archive when it changes + return os.path.join( + settings.AIRCOX_LOGS_ARCHIVES_DIR, + # FIXME: number format + '{}{}{}_{}.log.gz'.format( + date.year, date.month, date.day, station.pk + ) + ) + + def load_archive(self, station, date): + """ + Return archived logs for a specific date as a list + """ + import yaml + import gzip + + path = self._get_archive_path(station, date) + if not os.path.exists(path): + return [] + + with gzip.open(path, 'rb') as archive: + data = archive.read() + logs = yaml.load(data) + return logs + + def make_archive(self, station, date, force = False, keep = False): + """ + Archive logs of the given date. If the archive exists, it does + not overwrite it except if "force" is given. In this case, the + new elements will be appended to the existing archives. + + Return the number of archived logs, -1 if archive could not be + created. + """ + import yaml + import gzip + + os.makedirs(settings.AIRCOX_LOGS_ARCHIVES_DIR, exist_ok = True) + path = self._get_archive_path(station, date); + if os.path.exists(path) and not force: + return -1 + + qs = self.at(station, date) + if not qs.exists(): + return 0 + + fields = Log._meta.get_fields() + logs = [ + { + i.attname: getattr(log, i.attname) + for i in fields + } + for log in qs + ] + + with gzip.open(path, 'ab') as archive: + data = yaml.dump(logs).encode('utf8') + archive.write(data) + # TODO: delete logs + + if not keep: + qs.delete() + return len(logs) + class Log(models.Model): """ @@ -1241,20 +1320,25 @@ class Log(models.Model): class Type(IntEnum): stop = 0x00 """ - Source has been stopped (only when there is no more sound) + Source has been stopped, e.g. manually """ - play = 0x01 + start = 0x01 """ - The related item has been started by the streamer or manually, - and occured on air. + The diffusion or sound has been triggered by the streamer or + manually. """ load = 0x02 """ - Source starts to be preload related_object + A playlist has updated, and loading started. A related Diffusion + does not means that the playlist is only for it (e.g. after a + crash, it can reload previous remaining sound files + thoses of + the next diffusion) """ on_air = 0x03 """ - The related item has been detected occuring on air + The sound or diffusion has been detected occurring on air. Can + also designate live diffusion, although Liquidsoap did not play + them since they don't have an attached sound archive. """ other = 0x04 """ @@ -1336,9 +1420,15 @@ class Log(models.Model): Return True if the log is expired. Note that it only check against the date, so it is still possible that the expiration occured because of a Stop or other source. + + For sound logs, also check against sound duration when + end == date (e.g after a crash) """ date = utils.date_or_default(date) - return self.end < date + end = self.end + if end == self.date and self.sound: + end = self.date + to_timedelta(self.sound.duration) + return end < date def print(self): r = [] @@ -1349,15 +1439,18 @@ class Log(models.Model): if self.track: r.append('track: ' + str(self.track_id)) - logger.info('log #%s: %s%s', + logger.info('log %s: %s%s', str(self), self.comment or '', ' (' + ', '.join(r) + ')' if r else '' ) def __str__(self): - return '#{} ({}, {})'.format( - self.pk, self.date.strftime('%Y/%m/%d %H:%M'), self.source + return '#{} ({}, {}, {})'.format( + self.pk, + self.get_type_display(), + self.source, + self.date.strftime('%Y/%m/%d %H:%M'), ) def save(self, *args, **kwargs):