work on streamer, change a bit how it works; make archiving
This commit is contained in:
		@ -14,6 +14,7 @@ from argparse import RawTextHelpFormatter
 | 
			
		||||
from django.conf import settings as main_settings
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
from django.utils import timezone as tz
 | 
			
		||||
from django.utils.functional import cached_property
 | 
			
		||||
 | 
			
		||||
from aircox.models import Station, Diffusion, Track, Sound, Log #, DiffusionLog, SoundLog
 | 
			
		||||
 | 
			
		||||
@ -54,17 +55,41 @@ class Monitor:
 | 
			
		||||
    Datetime of the next sync
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    _last_log = None
 | 
			
		||||
    """
 | 
			
		||||
    Last emitted log
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def _last_log(self, **kwargs):
 | 
			
		||||
        return Log.objects.station(self.station, **kwargs) \
 | 
			
		||||
                  .select_related('diffusion', 'sound') \
 | 
			
		||||
                  .order_by('date').last()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def last_log(self):
 | 
			
		||||
        """
 | 
			
		||||
        Last log of monitored station
 | 
			
		||||
        """
 | 
			
		||||
        return self._last_log()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def last_sound(self):
 | 
			
		||||
        """
 | 
			
		||||
        Last sound log of monitored station that occurred on_air
 | 
			
		||||
        """
 | 
			
		||||
        return self._last_log(type = Log.Type.on_air,
 | 
			
		||||
                              sound__isnull = False)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def last_diff_start(self):
 | 
			
		||||
        """
 | 
			
		||||
        Log of last triggered item (sound or diffusion)
 | 
			
		||||
        """
 | 
			
		||||
        return self._last_log(type = Log.Type.start,
 | 
			
		||||
                              diffusion__isnull = False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __init__(self, station, **kwargs):
 | 
			
		||||
        self.station = station
 | 
			
		||||
        self.__dict__.update(kwargs)
 | 
			
		||||
 | 
			
		||||
        self._last_log = Log.objects.station(station).order_by('date') \
 | 
			
		||||
                            .last()
 | 
			
		||||
        now = tz.now()
 | 
			
		||||
 | 
			
		||||
    def monitor(self):
 | 
			
		||||
        """
 | 
			
		||||
@ -91,9 +116,9 @@ class Monitor:
 | 
			
		||||
 | 
			
		||||
        # update last log
 | 
			
		||||
        if log.type != Log.Type.other and \
 | 
			
		||||
                self._last_log and not self._last_log.end:
 | 
			
		||||
            self._last_log.end = log.date
 | 
			
		||||
        self._last_log = log
 | 
			
		||||
                self.last_log and not self.last_log.end:
 | 
			
		||||
            self.last_log.end = log.date
 | 
			
		||||
        return log
 | 
			
		||||
 | 
			
		||||
    def trace(self):
 | 
			
		||||
        """
 | 
			
		||||
@ -106,51 +131,59 @@ class Monitor:
 | 
			
		||||
        if not current_sound or not current_source:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        log = Log.objects.station(self.station, sound__isnull = False) \
 | 
			
		||||
                         .select_related('sound') \
 | 
			
		||||
                         .order_by('date').last()
 | 
			
		||||
        # last log can be anything, so we need to keep track of the last
 | 
			
		||||
        # sound log too
 | 
			
		||||
        # sound on air can be of a diffusion or a stream.
 | 
			
		||||
 | 
			
		||||
        # only streamed ns
 | 
			
		||||
        if log and not log.sound.diffusion:
 | 
			
		||||
        log = self.last_log
 | 
			
		||||
 | 
			
		||||
        # sound on air changed
 | 
			
		||||
        if log.source != current_source.id or \
 | 
			
		||||
                (log.sound and log.sound.path != current_sound):
 | 
			
		||||
            sound = Sound.objects.filter(path = current_sound).first()
 | 
			
		||||
 | 
			
		||||
            # find diff
 | 
			
		||||
            last_diff = self.last_diff_start
 | 
			
		||||
            diff = None
 | 
			
		||||
            if not last_diff.is_expired():
 | 
			
		||||
                archives = last_diff.diffusion.get_archives()
 | 
			
		||||
                if archives.filter(pk = sound.pk).exists():
 | 
			
		||||
                    diff = last_diff.diffusion
 | 
			
		||||
 | 
			
		||||
            # log sound on air
 | 
			
		||||
            log = self.log(
 | 
			
		||||
                type = Log.Type.on_air,
 | 
			
		||||
                source = current_source.id,
 | 
			
		||||
                date = tz.now(),
 | 
			
		||||
                sound = sound,
 | 
			
		||||
                diffusion = diff,
 | 
			
		||||
                # keep sound path (if sound is removed, we keep that info)
 | 
			
		||||
                comment = current_sound,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # tracks -- only for sound's
 | 
			
		||||
        if not log.diffusion:
 | 
			
		||||
            self.trace_sound_tracks(log)
 | 
			
		||||
 | 
			
		||||
        # TODO: expiration
 | 
			
		||||
        if log and (log.source == current_source.id and \
 | 
			
		||||
                log.sound and
 | 
			
		||||
                log.sound.path == current_sound):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        sound = Sound.objects.filter(path = current_sound).first()
 | 
			
		||||
        self.log(
 | 
			
		||||
            type = Log.Type.on_air,
 | 
			
		||||
            source = current_source.id,
 | 
			
		||||
            date = tz.now(),
 | 
			
		||||
            sound = sound,
 | 
			
		||||
            # keep sound path (if sound is removed, we keep that info)
 | 
			
		||||
            comment = current_sound,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def trace_sound_tracks(self, log):
 | 
			
		||||
        """
 | 
			
		||||
        Log tracks for the given sound (for streamed programs); Called by
 | 
			
		||||
        self.trace
 | 
			
		||||
        Log tracks for the given sound log (for streamed programs).
 | 
			
		||||
        Called by self.trace
 | 
			
		||||
        """
 | 
			
		||||
        logs = Log.objects.station(self.station,
 | 
			
		||||
                                   track__isnull = False,
 | 
			
		||||
                                   pk__gt = log.pk) \
 | 
			
		||||
                          .values_list('sound__pk', flat = True)
 | 
			
		||||
 | 
			
		||||
        tracks = Track.objects.get_for(object = log.sound) \
 | 
			
		||||
                              .filter(in_seconds = True)
 | 
			
		||||
        if tracks and len(tracks) == len(logs):
 | 
			
		||||
        if not tracks.exists():
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        tracks = tracks.exclude(pk__in = logs).order_by('position')
 | 
			
		||||
        tracks = tracks.exclude(log__station = self.station,
 | 
			
		||||
                                log__pk__gt = log.pk)
 | 
			
		||||
        now = tz.now()
 | 
			
		||||
        for track in tracks:
 | 
			
		||||
            pos = log.date + tz.timedelta(seconds = track.position)
 | 
			
		||||
            if pos > now:
 | 
			
		||||
                break
 | 
			
		||||
            # log track on air
 | 
			
		||||
            self.log(
 | 
			
		||||
                type = Log.Type.on_air, source = log.source,
 | 
			
		||||
                date = pos, track = track,
 | 
			
		||||
@ -195,6 +228,7 @@ class Monitor:
 | 
			
		||||
            if diff.start < now:
 | 
			
		||||
                diff.type = Diffusion.Type.canceled
 | 
			
		||||
                diff.save()
 | 
			
		||||
                # log canceled diffusion
 | 
			
		||||
                self.log(
 | 
			
		||||
                    type = Log.Type.other,
 | 
			
		||||
                    diffusion = diff,
 | 
			
		||||
@ -254,13 +288,17 @@ class Monitor:
 | 
			
		||||
        """
 | 
			
		||||
        Update playlist of a source if required, and handle logging when
 | 
			
		||||
        it is needed.
 | 
			
		||||
 | 
			
		||||
        - source: source on which it happens
 | 
			
		||||
        - playlist: list of sounds to use to update
 | 
			
		||||
        - diff: related diffusion
 | 
			
		||||
        """
 | 
			
		||||
        dealer = self.station.dealer
 | 
			
		||||
        if dealer.playlist == playlist:
 | 
			
		||||
        if source.playlist == playlist:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        dealer.playlist = playlist
 | 
			
		||||
        source.playlist = playlist
 | 
			
		||||
        if diff and not diff.is_live():
 | 
			
		||||
            # log diffusion archive load
 | 
			
		||||
            self.log(type = Log.Type.load,
 | 
			
		||||
                     source = source.id,
 | 
			
		||||
                     diffusion = diff,
 | 
			
		||||
@ -284,6 +322,7 @@ class Monitor:
 | 
			
		||||
            diff_ = Log.objects.station(self.station) \
 | 
			
		||||
                       .filter(diffusion = diff, type = Log.Type.on_air)
 | 
			
		||||
            if not diff_.count():
 | 
			
		||||
                # log live diffusion
 | 
			
		||||
                self.log(type = Log.Type.on_air, source = source.id,
 | 
			
		||||
                         diffusion = diff, date = date)
 | 
			
		||||
            return
 | 
			
		||||
@ -291,8 +330,11 @@ class Monitor:
 | 
			
		||||
        # enable dealer
 | 
			
		||||
        if not source.active:
 | 
			
		||||
            source.active = True
 | 
			
		||||
            self.log(type = Log.Type.play, source = source.id,
 | 
			
		||||
                     diffusion = diff, date = date)
 | 
			
		||||
            last_start = self.last_start
 | 
			
		||||
            if last_start.diffusion_id != diff.pk:
 | 
			
		||||
                # log triggered diffusion
 | 
			
		||||
                self.log(type = Log.Type.start, source = source.id,
 | 
			
		||||
                         diffusion = diff, date = date)
 | 
			
		||||
 | 
			
		||||
    def handle(self):
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										121
									
								
								aircox/models.py
									
									
									
									
									
								
							
							
						
						
									
										121
									
								
								aircox/models.py
									
									
									
									
									
								
							@ -141,7 +141,7 @@ class Track(Related):
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return '{self.artist} -- {self.title}'.format(self=self)
 | 
			
		||||
        return '{self.artist} -- {self.title} -- {self.position}'.format(self=self)
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        verbose_name = _('Track')
 | 
			
		||||
@ -245,6 +245,10 @@ class Station(Nameable):
 | 
			
		||||
 | 
			
		||||
        If date is not specified, count MUST be set to a non-zero value.
 | 
			
		||||
        Be careful with what you which for: the result is a plain list.
 | 
			
		||||
 | 
			
		||||
        It is different from Station.played method since it filters out
 | 
			
		||||
        elements that should have not been on air, such as a stream that
 | 
			
		||||
        has been played when there was a live diffusion.
 | 
			
		||||
        """
 | 
			
		||||
        # FIXME: as an iterator?
 | 
			
		||||
        # TODO argument to get sound instead of tracks
 | 
			
		||||
@ -1209,7 +1213,8 @@ class LogManager(models.Manager):
 | 
			
		||||
        qs = self._at(date, qs, **kwargs)
 | 
			
		||||
        return self.station(station, qs) if station else qs
 | 
			
		||||
 | 
			
		||||
    def played(self, station, archives = True, **kwargs):
 | 
			
		||||
    # TODO: rename on_air + rename Station.on_air into sth like regular_on_air
 | 
			
		||||
    def played(self, station, archives = True, date = None, **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        Return a queryset of the played elements' log for the given
 | 
			
		||||
        station and model. This queryset is ordered by date ascending
 | 
			
		||||
@ -1217,11 +1222,18 @@ class LogManager(models.Manager):
 | 
			
		||||
        * station: related station
 | 
			
		||||
        * archives: if false, exclude log of diffusion's archives from
 | 
			
		||||
            the queryset;
 | 
			
		||||
        * date: get played logs at the given date only
 | 
			
		||||
        * include_live: include diffusion that have no archive
 | 
			
		||||
        * kwargs: extra filter kwargs
 | 
			
		||||
        """
 | 
			
		||||
        qs = self.filter(type__in = (Log.Type.play, Log.Type.on_air),
 | 
			
		||||
                         **kwargs)
 | 
			
		||||
        if date:
 | 
			
		||||
            qs = self.at(station, date)
 | 
			
		||||
        else:
 | 
			
		||||
            qs = self
 | 
			
		||||
 | 
			
		||||
        qs = qs.filter(
 | 
			
		||||
            type__in = (Log.Type.start, Log.Type.on_air), **kwargs
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not archives and station.dealer:
 | 
			
		||||
            qs = qs.exclude(
 | 
			
		||||
@ -1230,6 +1242,73 @@ class LogManager(models.Manager):
 | 
			
		||||
            )
 | 
			
		||||
        return qs.order_by('date')
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_archive_path(station, date):
 | 
			
		||||
        # note: station name is not included in order to avoid problems
 | 
			
		||||
        #       of retrieving archive when it changes
 | 
			
		||||
        return os.path.join(
 | 
			
		||||
            settings.AIRCOX_LOGS_ARCHIVES_DIR,
 | 
			
		||||
            # FIXME: number format
 | 
			
		||||
            '{}{}{}_{}.log.gz'.format(
 | 
			
		||||
                date.year, date.month, date.day, station.pk
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def load_archive(self, station, date):
 | 
			
		||||
        """
 | 
			
		||||
        Return archived logs for a specific date as a list
 | 
			
		||||
        """
 | 
			
		||||
        import yaml
 | 
			
		||||
        import gzip
 | 
			
		||||
 | 
			
		||||
        path = self._get_archive_path(station, date)
 | 
			
		||||
        if not os.path.exists(path):
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
        with gzip.open(path, 'rb') as archive:
 | 
			
		||||
            data = archive.read()
 | 
			
		||||
            logs = yaml.load(data)
 | 
			
		||||
            return logs
 | 
			
		||||
 | 
			
		||||
    def make_archive(self, station, date, force = False, keep = False):
 | 
			
		||||
        """
 | 
			
		||||
        Archive logs of the given date. If the archive exists, it does
 | 
			
		||||
        not overwrite it except if "force" is given. In this case, the
 | 
			
		||||
        new elements will be appended to the existing archives.
 | 
			
		||||
 | 
			
		||||
        Return the number of archived logs, -1 if archive could not be
 | 
			
		||||
        created.
 | 
			
		||||
        """
 | 
			
		||||
        import yaml
 | 
			
		||||
        import gzip
 | 
			
		||||
 | 
			
		||||
        os.makedirs(settings.AIRCOX_LOGS_ARCHIVES_DIR, exist_ok = True)
 | 
			
		||||
        path = self._get_archive_path(station, date);
 | 
			
		||||
        if os.path.exists(path) and not force:
 | 
			
		||||
            return -1
 | 
			
		||||
 | 
			
		||||
        qs = self.at(station, date)
 | 
			
		||||
        if not qs.exists():
 | 
			
		||||
            return 0
 | 
			
		||||
 | 
			
		||||
        fields = Log._meta.get_fields()
 | 
			
		||||
        logs = [
 | 
			
		||||
            {
 | 
			
		||||
                i.attname: getattr(log, i.attname)
 | 
			
		||||
                for i in fields
 | 
			
		||||
            }
 | 
			
		||||
            for log in qs
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        with gzip.open(path, 'ab') as archive:
 | 
			
		||||
            data = yaml.dump(logs).encode('utf8')
 | 
			
		||||
            archive.write(data)
 | 
			
		||||
            # TODO: delete logs
 | 
			
		||||
 | 
			
		||||
        if not keep:
 | 
			
		||||
            qs.delete()
 | 
			
		||||
        return len(logs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Log(models.Model):
 | 
			
		||||
    """
 | 
			
		||||
@ -1241,20 +1320,25 @@ class Log(models.Model):
 | 
			
		||||
    class Type(IntEnum):
 | 
			
		||||
        stop = 0x00
 | 
			
		||||
        """
 | 
			
		||||
        Source has been stopped (only when there is no more sound)
 | 
			
		||||
        Source has been stopped, e.g. manually
 | 
			
		||||
        """
 | 
			
		||||
        play = 0x01
 | 
			
		||||
        start = 0x01
 | 
			
		||||
        """
 | 
			
		||||
        The related item has been started by the streamer or manually,
 | 
			
		||||
        and occured on air.
 | 
			
		||||
        The diffusion or sound has been triggered by the streamer or
 | 
			
		||||
        manually.
 | 
			
		||||
        """
 | 
			
		||||
        load = 0x02
 | 
			
		||||
        """
 | 
			
		||||
        Source starts to be preload related_object
 | 
			
		||||
        A playlist has updated, and loading started. A related Diffusion
 | 
			
		||||
        does not means that the playlist is only for it (e.g. after a
 | 
			
		||||
        crash, it can reload previous remaining sound files + thoses of
 | 
			
		||||
        the next diffusion)
 | 
			
		||||
        """
 | 
			
		||||
        on_air = 0x03
 | 
			
		||||
        """
 | 
			
		||||
        The related item has been detected occuring on air
 | 
			
		||||
        The sound or diffusion has been detected occurring on air. Can
 | 
			
		||||
        also designate live diffusion, although Liquidsoap did not play
 | 
			
		||||
        them since they don't have an attached sound archive.
 | 
			
		||||
        """
 | 
			
		||||
        other = 0x04
 | 
			
		||||
        """
 | 
			
		||||
@ -1336,9 +1420,15 @@ class Log(models.Model):
 | 
			
		||||
        Return True if the log is expired. Note that it only check
 | 
			
		||||
        against the date, so it is still possible that the expiration
 | 
			
		||||
        occured because of a Stop or other source.
 | 
			
		||||
 | 
			
		||||
        For sound logs, also check against sound duration when
 | 
			
		||||
        end == date (e.g after a crash)
 | 
			
		||||
        """
 | 
			
		||||
        date = utils.date_or_default(date)
 | 
			
		||||
        return self.end < date
 | 
			
		||||
        end = self.end
 | 
			
		||||
        if end == self.date and self.sound:
 | 
			
		||||
            end = self.date + to_timedelta(self.sound.duration)
 | 
			
		||||
        return end < date
 | 
			
		||||
 | 
			
		||||
    def print(self):
 | 
			
		||||
        r = []
 | 
			
		||||
@ -1349,15 +1439,18 @@ class Log(models.Model):
 | 
			
		||||
        if self.track:
 | 
			
		||||
            r.append('track: ' + str(self.track_id))
 | 
			
		||||
 | 
			
		||||
        logger.info('log #%s: %s%s',
 | 
			
		||||
        logger.info('log %s: %s%s',
 | 
			
		||||
            str(self),
 | 
			
		||||
            self.comment or '',
 | 
			
		||||
            ' (' + ', '.join(r) + ')' if r else ''
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return '#{} ({}, {})'.format(
 | 
			
		||||
                self.pk, self.date.strftime('%Y/%m/%d %H:%M'), self.source
 | 
			
		||||
        return '#{} ({}, {}, {})'.format(
 | 
			
		||||
                self.pk,
 | 
			
		||||
                self.get_type_display(),
 | 
			
		||||
                self.source,
 | 
			
		||||
                self.date.strftime('%Y/%m/%d %H:%M'),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def save(self, *args, **kwargs):
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user