work on archiver
This commit is contained in:
		@ -2,15 +2,16 @@
 | 
			
		||||
Handle archiving of logs in order to keep database light and fast. The
 | 
			
		||||
logs are archived in gzip files, per day.
 | 
			
		||||
"""
 | 
			
		||||
import logging
 | 
			
		||||
from argparse import RawTextHelpFormatter
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from django.conf import settings as main_settings
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
from django.core.management.base import BaseCommand
 | 
			
		||||
from django.utils import timezone as tz
 | 
			
		||||
 | 
			
		||||
import aircox.settings as settings
 | 
			
		||||
from aircox.models import Log, Station
 | 
			
		||||
from aircox.models.log import LogArchiver
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger('aircox.commands')
 | 
			
		||||
 | 
			
		||||
@ -27,30 +28,14 @@ class Command (BaseCommand):
 | 
			
		||||
            help='minimal age in days of logs to archive. Default is '
 | 
			
		||||
                 'settings.AIRCOX_LOGS_ARCHIVES_AGE'
 | 
			
		||||
        )
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '-f', '--force', action='store_true',
 | 
			
		||||
            help='if an archive exists yet, force it to be updated'
 | 
			
		||||
        )
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '-k', '--keep', action='store_true',
 | 
			
		||||
            help='keep logs in database instead of deleting them'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def handle(self, *args, age, force, keep, **options):
 | 
			
		||||
        date = tz.now() - tz.timedelta(days=age)
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            date = date.replace(
 | 
			
		||||
                hour=0, minute=0, second=0, microsecond=0
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            logger.info('archive log at date %s', date)
 | 
			
		||||
            for station in Station.objects.all():
 | 
			
		||||
                Log.objects.make_archive(
 | 
			
		||||
                    station, date, force=force, keep=keep
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            qs = Log.objects.filter(date__lt=date)
 | 
			
		||||
            if not qs.exists():
 | 
			
		||||
                break
 | 
			
		||||
            date = qs.order_by('-date').first().date
 | 
			
		||||
    def handle(self, *args, age, keep, **options):
 | 
			
		||||
        date = datetime.date.today() - tz.timedelta(days=age)
 | 
			
		||||
        # FIXME: mysql support?
 | 
			
		||||
        logger.info('archive logs for %s and earlier', date)
 | 
			
		||||
        count = LogArchiver().archive(Log.objects.filter(date__date__lte=date))
 | 
			
		||||
        logger.info('total log archived %d', count)
 | 
			
		||||
 | 
			
		||||
@ -1,13 +1,16 @@
 | 
			
		||||
from collections import deque
 | 
			
		||||
import datetime
 | 
			
		||||
import gzip
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
from django.db import models
 | 
			
		||||
from django.utils import timezone as tz
 | 
			
		||||
from django.utils.functional import cached_property
 | 
			
		||||
from django.utils.translation import gettext_lazy as _
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from aircox import settings
 | 
			
		||||
from .episode import Diffusion
 | 
			
		||||
from .sound import Sound, Track
 | 
			
		||||
@ -17,7 +20,7 @@ from .station import Station
 | 
			
		||||
logger = logging.getLogger('aircox')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
__all__ = ['Log', 'LogQuerySet']
 | 
			
		||||
__all__ = ['Log', 'LogQuerySet', 'LogArchiver']
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LogQuerySet(models.QuerySet):
 | 
			
		||||
@ -52,109 +55,6 @@ class LogQuerySet(models.QuerySet):
 | 
			
		||||
    def with_track(self, with_it=True):
 | 
			
		||||
        return self.filter(track__isnull=not with_it)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_archive_path(station, date):
 | 
			
		||||
        return os.path.join(
 | 
			
		||||
            settings.AIRCOX_LOGS_ARCHIVES_DIR,
 | 
			
		||||
            '{}_{}.log.gz'.format(date.strftime("%Y%m%d"), station.pk)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_rel_objects(logs, type, attr):
 | 
			
		||||
        """
 | 
			
		||||
        From a list of dict representing logs, retrieve related objects
 | 
			
		||||
        of the given type.
 | 
			
		||||
 | 
			
		||||
        Example: _get_rel_objects([{..},..], Diffusion, 'diffusion')
 | 
			
		||||
        """
 | 
			
		||||
        attr_id = attr + '_id'
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            rel.pk: rel
 | 
			
		||||
 | 
			
		||||
            for rel in type.objects.filter(
 | 
			
		||||
                pk__in=(
 | 
			
		||||
                    log[attr_id]
 | 
			
		||||
 | 
			
		||||
                    for log in logs if attr_id in log
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def load_archive(self, station, date):
 | 
			
		||||
        """
 | 
			
		||||
        Return archived logs for a specific date as a list
 | 
			
		||||
        """
 | 
			
		||||
        import yaml
 | 
			
		||||
        import gzip
 | 
			
		||||
 | 
			
		||||
        path = self._get_archive_path(station, date)
 | 
			
		||||
 | 
			
		||||
        if not os.path.exists(path):
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
        with gzip.open(path, 'rb') as archive:
 | 
			
		||||
            data = archive.read()
 | 
			
		||||
            logs = yaml.load(data)
 | 
			
		||||
 | 
			
		||||
            # we need to preload diffusions, sounds and tracks
 | 
			
		||||
            rels = {
 | 
			
		||||
                'diffusion': self._get_rel_objects(logs, Diffusion, 'diffusion'),
 | 
			
		||||
                'sound': self._get_rel_objects(logs, Sound, 'sound'),
 | 
			
		||||
                'track': self._get_rel_objects(logs, Track, 'track'),
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            def rel_obj(log, attr):
 | 
			
		||||
                rel_id = log.get(attr + '_id')
 | 
			
		||||
                return rels[attr][rel_id] if rel_id else None
 | 
			
		||||
 | 
			
		||||
            return [
 | 
			
		||||
                Log(diffusion=rel_obj(log, 'diffusion'),
 | 
			
		||||
                    sound=rel_obj(log, 'sound'),
 | 
			
		||||
                    track=rel_obj(log, 'track'),
 | 
			
		||||
                    **log)
 | 
			
		||||
 | 
			
		||||
                for log in logs
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
    def make_archive(self, station, date, force=False, keep=False):
 | 
			
		||||
        """
 | 
			
		||||
        Archive logs of the given date. If the archive exists, it does
 | 
			
		||||
        not overwrite it except if "force" is given. In this case, the
 | 
			
		||||
        new elements will be appended to the existing archives.
 | 
			
		||||
 | 
			
		||||
        Return the number of archived logs, -1 if archive could not be
 | 
			
		||||
        created.
 | 
			
		||||
        """
 | 
			
		||||
        import yaml
 | 
			
		||||
        import gzip
 | 
			
		||||
 | 
			
		||||
        os.makedirs(settings.AIRCOX_LOGS_ARCHIVES_DIR, exist_ok=True)
 | 
			
		||||
        path = self._get_archive_path(station, date)
 | 
			
		||||
 | 
			
		||||
        if os.path.exists(path) and not force:
 | 
			
		||||
            return -1
 | 
			
		||||
 | 
			
		||||
        qs = self.station(station).date(date)
 | 
			
		||||
 | 
			
		||||
        if not qs.exists():
 | 
			
		||||
            return 0
 | 
			
		||||
 | 
			
		||||
        fields = Log._meta.get_fields()
 | 
			
		||||
        logs = [{i.attname: getattr(log, i.attname)
 | 
			
		||||
                 for i in fields} for log in qs]
 | 
			
		||||
 | 
			
		||||
        # Note: since we use Yaml, we can just append new logs when file
 | 
			
		||||
        # exists yet <3
 | 
			
		||||
        with gzip.open(path, 'ab') as archive:
 | 
			
		||||
            data = yaml.dump(logs).encode('utf8')
 | 
			
		||||
            archive.write(data)
 | 
			
		||||
 | 
			
		||||
        if not keep:
 | 
			
		||||
            qs.delete()
 | 
			
		||||
 | 
			
		||||
        return len(logs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Log(models.Model):
 | 
			
		||||
    """
 | 
			
		||||
@ -305,3 +205,101 @@ class Log(models.Model):
 | 
			
		||||
        logger.info('log %s: %s%s', str(self), self.comment or '',
 | 
			
		||||
                    ' (' + ', '.join(r) + ')' if r else '')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LogArchiver:
 | 
			
		||||
    """ Commodity class used to manage archives of logs. """
 | 
			
		||||
    @cached_property
 | 
			
		||||
    def fields(self):
 | 
			
		||||
        return Log._meta.get_fields()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_path(station, date):
 | 
			
		||||
        return os.path.join(
 | 
			
		||||
            settings.AIRCOX_LOGS_ARCHIVES_DIR,
 | 
			
		||||
            '{}_{}.log.gz'.format(date.strftime("%Y%m%d"), station.pk)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def archive(self, qs, keep=False):
 | 
			
		||||
        """
 | 
			
		||||
        Archive logs of the given queryset. Delete archived logs if not
 | 
			
		||||
        `keep`. Return the count of archived logs
 | 
			
		||||
        """
 | 
			
		||||
        if not qs.exists():
 | 
			
		||||
            return 0
 | 
			
		||||
 | 
			
		||||
        os.makedirs(settings.AIRCOX_LOGS_ARCHIVES_DIR, exist_ok=True)
 | 
			
		||||
        count = qs.count()
 | 
			
		||||
        logs = self.sort_logs(qs)
 | 
			
		||||
 | 
			
		||||
        # Note: since we use Yaml, we can just append new logs when file
 | 
			
		||||
        # exists yet <3
 | 
			
		||||
        for (station, date), logs in logs.items():
 | 
			
		||||
            path = self.get_path(station, date)
 | 
			
		||||
            with gzip.open(path, 'ab') as archive:
 | 
			
		||||
                data = yaml.dump([self.serialize(l) for l in logs]).encode('utf8')
 | 
			
		||||
                archive.write(data)
 | 
			
		||||
 | 
			
		||||
        if not keep:
 | 
			
		||||
            qs.delete()
 | 
			
		||||
 | 
			
		||||
        return count
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def sort_logs(qs):
 | 
			
		||||
        """
 | 
			
		||||
        Sort logs by station and date and return a dict of
 | 
			
		||||
        `{ (station,date): [logs] }`.
 | 
			
		||||
        """
 | 
			
		||||
        qs = qs.order_by('date')
 | 
			
		||||
        logs = {}
 | 
			
		||||
        for log in qs:
 | 
			
		||||
            key = (log.station, log.date)
 | 
			
		||||
            if key not in logs:
 | 
			
		||||
                logs[key] = [log]
 | 
			
		||||
            else:
 | 
			
		||||
                logs[key].append(log)
 | 
			
		||||
        return logs
 | 
			
		||||
 | 
			
		||||
    def serialize(self, log):
 | 
			
		||||
        """ Serialize log """
 | 
			
		||||
        return {i.attname: getattr(log, i.attname)
 | 
			
		||||
                for i in self.fields}
 | 
			
		||||
 | 
			
		||||
    def load(self, station, date):
 | 
			
		||||
        """ Load an archive returning logs in a list. """
 | 
			
		||||
        path = self.get_path(station, date)
 | 
			
		||||
 | 
			
		||||
        if not os.path.exists(path):
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
        with gzip.open(path, 'rb') as archive:
 | 
			
		||||
            data = archive.read()
 | 
			
		||||
            logs = yaml.load(data)
 | 
			
		||||
 | 
			
		||||
            # we need to preload diffusions, sounds and tracks
 | 
			
		||||
            rels = {
 | 
			
		||||
                'diffusion': self.get_relations(logs, Diffusion, 'diffusion'),
 | 
			
		||||
                'sound': self.get_relations(logs, Sound, 'sound'),
 | 
			
		||||
                'track': self.get_relations(logs, Track, 'track'),
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            def rel_obj(log, attr):
 | 
			
		||||
                rel_id = log.get(attr + '_id')
 | 
			
		||||
                return rels[attr][rel_id] if rel_id else None
 | 
			
		||||
 | 
			
		||||
            return [Log(diffusion=rel_obj(log, 'diffusion'),
 | 
			
		||||
                        sound=rel_obj(log, 'sound'),
 | 
			
		||||
                        track=rel_obj(log, 'track'),
 | 
			
		||||
                        **log) for log in logs]
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_relations(logs, model, attr):
 | 
			
		||||
        """
 | 
			
		||||
        From a list of dict representing logs, retrieve related objects
 | 
			
		||||
        of the given type.
 | 
			
		||||
        """
 | 
			
		||||
        attr_id = attr + '_id'
 | 
			
		||||
        pks = (log[attr_id] for log in logs if attr_id in log)
 | 
			
		||||
        return {rel.pk: rel for rel in model.objects.filter(pk__in=pks)}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -92,14 +92,13 @@ class BasePage(models.Model):
 | 
			
		||||
        return '{}'.format(self.title or self.pk)
 | 
			
		||||
 | 
			
		||||
    def save(self, *args, **kwargs):
 | 
			
		||||
        # TODO: bleach clean
 | 
			
		||||
        if not self.slug:
 | 
			
		||||
            self.slug = slugify(self.title)[:100]
 | 
			
		||||
            count = Page.objects.filter(slug__startswith=self.slug).count()
 | 
			
		||||
            if count:
 | 
			
		||||
                self.slug += '-' + str(count)
 | 
			
		||||
 | 
			
		||||
        if not self.cover and self.parent:
 | 
			
		||||
        if self.parent and not self.cover:
 | 
			
		||||
            self.cover = self.parent.cover
 | 
			
		||||
        super().save(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
@ -161,6 +160,9 @@ class Page(BasePage):
 | 
			
		||||
            self.pub_date = tz.now()
 | 
			
		||||
        elif not self.is_published:
 | 
			
		||||
            self.pub_date = None
 | 
			
		||||
 | 
			
		||||
        if self.parent and not self.category:
 | 
			
		||||
            self.category = self.parent.category
 | 
			
		||||
        super().save(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,10 +1,50 @@
 | 
			
		||||
# Code inspired from rest_framework of course.
 | 
			
		||||
import os
 | 
			
		||||
import stat
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#class BaseSettings:
 | 
			
		||||
#    def __init__(self, user_conf):
 | 
			
		||||
#        if user_conf:
 | 
			
		||||
#            for key, value in user_conf.items():
 | 
			
		||||
#                setattr(self, key, value)
 | 
			
		||||
#
 | 
			
		||||
#class Settings(BaseSettings):
 | 
			
		||||
#    default_user_groups = {
 | 
			
		||||
#
 | 
			
		||||
#    }
 | 
			
		||||
#
 | 
			
		||||
#    programs_dir = os.path.join(settings.MEDIA_ROOT, 'programs'),
 | 
			
		||||
#    """ Programs data directory. """
 | 
			
		||||
#    episode_title = '{program.title} - {date}'
 | 
			
		||||
#    """ Default episodes title. """
 | 
			
		||||
#    episode_title_date_format = '%-d %B %Y'
 | 
			
		||||
#    """ Date format used in episode title. """
 | 
			
		||||
#
 | 
			
		||||
#    logs_archives_dir = os.path.join(settings.PROJECT_ROOT, 'logs/archives')
 | 
			
		||||
#    """ Directory where logs are saved once archived """
 | 
			
		||||
#    logs_archive_age = 30
 | 
			
		||||
#    """ Default age of log before being archived """
 | 
			
		||||
#
 | 
			
		||||
#    sounds_default_dir = os.path.join(settings.MEDIA_ROOT, 'programs/defaults')
 | 
			
		||||
#    sound_archive_dir = 'archives'
 | 
			
		||||
#    sound_excerpt_dir = 'excerpts'
 | 
			
		||||
#    sound_auto_chmod = True
 | 
			
		||||
#    sound_chmod_flags = (stat.S_IRWXU, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH)
 | 
			
		||||
#    sound_quality = {
 | 
			
		||||
#        'attribute': 'RMS lev dB',
 | 
			
		||||
#        'range': (-18.0, -8.0),
 | 
			
		||||
#        'sample_length': 120,
 | 
			
		||||
#    }
 | 
			
		||||
#    sound_ext = ('.ogg', '.flac', '.wav', '.mp3', '.opus')
 | 
			
		||||
#
 | 
			
		||||
#    # TODO: move into aircox_streamer
 | 
			
		||||
#    streamer_working_dir = '/tmp/aircox'
 | 
			
		||||
#
 | 
			
		||||
#
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
def ensure(key, default):
 | 
			
		||||
    globals()[key] = getattr(settings, key, default)
 | 
			
		||||
 | 
			
		||||
@ -16,12 +56,22 @@ def ensure(key, default):
 | 
			
		||||
# to add to each group.
 | 
			
		||||
ensure('AIRCOX_DEFAULT_USER_GROUPS', {
 | 
			
		||||
    'radio hosts': (
 | 
			
		||||
        # TODO include content_type in order to avoid clash with potential
 | 
			
		||||
        #      extra applications
 | 
			
		||||
 | 
			
		||||
        # aircox
 | 
			
		||||
        'change_program', 'change_episode', 'change_diffusion',
 | 
			
		||||
        'add_comment', 'change_comment', 'delete_comment',
 | 
			
		||||
        'add_article', 'change_article', 'delete_article',
 | 
			
		||||
        'change_sound',
 | 
			
		||||
        'add_track', 'change_track', 'delete_track',
 | 
			
		||||
 | 
			
		||||
        # taggit
 | 
			
		||||
        'add_tag', 'change_tag', 'delete_tag',
 | 
			
		||||
 | 
			
		||||
        # filer
 | 
			
		||||
        'add_folder', 'change_folder', 'delete_folder', 'can_use_directory_listing',
 | 
			
		||||
        'add_image', 'change_image', 'delete_image',
 | 
			
		||||
    ),
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
@ -29,10 +79,6 @@ ensure('AIRCOX_DEFAULT_USER_GROUPS', {
 | 
			
		||||
ensure('AIRCOX_PROGRAMS_DIR',
 | 
			
		||||
       os.path.join(settings.MEDIA_ROOT, 'programs'))
 | 
			
		||||
 | 
			
		||||
# Directory for working data
 | 
			
		||||
ensure('AIRCOX_DATA_DIR',
 | 
			
		||||
       os.path.join(settings.PROJECT_ROOT, 'data'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
########################################################################
 | 
			
		||||
# Programs & Episodes
 | 
			
		||||
@ -46,9 +92,7 @@ ensure('AIRCOX_EPISODE_TITLE_DATE_FORMAT', '%-d %B %Y')
 | 
			
		||||
# Logs & Archives
 | 
			
		||||
########################################################################
 | 
			
		||||
# Directory where to save logs' archives
 | 
			
		||||
ensure('AIRCOX_LOGS_ARCHIVES_DIR',
 | 
			
		||||
       os.path.join(AIRCOX_DATA_DIR, 'episodes')
 | 
			
		||||
       )
 | 
			
		||||
ensure('AIRCOX_LOGS_ARCHIVES_DIR', os.path.join(settings.PROJECT_ROOT, 'logs/archives'))
 | 
			
		||||
# In days, minimal age of a log before it is archived
 | 
			
		||||
ensure('AIRCOX_LOGS_ARCHIVES_AGE', 60)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user