diff --git a/aircox/controllers/README.md b/aircox/controllers/README.md new file mode 100644 index 0000000..0747843 --- /dev/null +++ b/aircox/controllers/README.md @@ -0,0 +1,8 @@ +# aircox.controllers +This module provides the following controllers classes: +- `log_archiver.LogArchive`: dumps and load gzip archives from Log models. +- `sound_file.SoundFile`: handle synchronisation between filesystem and database for a sound file. +- `sound_monitor.SoundMonitor`: monitor filesystem for changes on audio files and synchronise database. +- `sound_stats.SoundStats` (+ `SoxStats`): get audio statistics of an audio file using Sox. +- `diffuions.Diffusions`: generate, update and clean diffusions. +- `playlist_import.PlaylistImport`: import playlists from CSV. diff --git a/aircox/controllers/__init__.py b/aircox/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aircox/controllers/diffusions.py b/aircox/controllers/diffusions.py new file mode 100644 index 0000000..2527ecb --- /dev/null +++ b/aircox/controllers/diffusions.py @@ -0,0 +1,58 @@ +import datetime +import logging + +from django.db import transaction + +from aircox.models import Diffusion, Schedule + +logger = logging.getLogger("aircox.commands") + + +__all__ = ("Diffusions",) + + +class Diffusions: + """Handle generation and update of Diffusion instances.""" + + date = None + + def __init__(self, date): + self.date = date or datetime.date.today() + + def update(self): + episodes, diffusions = [], [] + for schedule in Schedule.objects.filter( + program__active=True, initial__isnull=True + ): + eps, diffs = schedule.diffusions_of_month(self.date) + if eps: + episodes += eps + if diffs: + diffusions += diffs + + logger.info( + "[update] %s: %d episodes, %d diffusions and reruns", + str(schedule), + len(eps), + len(diffs), + ) + + with transaction.atomic(): + logger.info( + "[update] save %d episodes and %d diffusions", + len(episodes), + len(diffusions), + ) + for episode in episodes: + episode.save() + for diffusion in diffusions: + # force episode id's update + diffusion.episode = diffusion.episode + diffusion.save() + + def clean(self): + qs = Diffusion.objects.filter( + type=Diffusion.TYPE_UNCONFIRMED, start__lt=self.date + ) + logger.info("[clean] %d diffusions will be removed", qs.count()) + qs.delete() diff --git a/aircox/controllers/log_archiver.py b/aircox/controllers/log_archiver.py new file mode 100644 index 0000000..9984350 --- /dev/null +++ b/aircox/controllers/log_archiver.py @@ -0,0 +1,114 @@ +import gzip +import os + +import yaml +from django.utils.functional import cached_property + +from aircox.conf import settings +from aircox.models import Diffusion, Sound, Track, Log + + +__all__ = ("LogArchiver",) + + +class LogArchiver: + """Commodity class used to manage archives of logs.""" + + @cached_property + def fields(self): + return Log._meta.get_fields() + + @staticmethod + def get_path(station, date): + return os.path.join( + settings.LOGS_ARCHIVES_DIR_ABS, + "{}_{}.log.gz".format(date.strftime("%Y%m%d"), station.pk), + ) + + def archive(self, qs, keep=False): + """Archive logs of the given queryset. + + Delete archived logs if not `keep`. Return the count of archived + logs + """ + if not qs.exists(): + return 0 + + os.makedirs(settings.LOGS_ARCHIVES_DIR_ABS, exist_ok=True) + count = qs.count() + logs = self.sort_logs(qs) + + # Note: since we use Yaml, we can just append new logs when file + # exists yet <3 + for (station, date), logs in logs.items(): + path = self.get_path(station, date) + with gzip.open(path, "ab") as archive: + data = yaml.dump( + [self.serialize(line) for line in logs] + ).encode("utf8") + archive.write(data) + + if not keep: + qs.delete() + + return count + + @staticmethod + def sort_logs(qs): + """Sort logs by station and date and return a dict of `{ + (station,date): [logs] }`.""" + qs = qs.order_by("date") + logs = {} + for log in qs: + key = (log.station, log.date) + if key not in logs: + logs[key] = [log] + else: + logs[key].append(log) + return logs + + def serialize(self, log): + """Serialize log.""" + return {i.attname: getattr(log, i.attname) for i in self.fields} + + def load(self, station, date): + """Load an archive returning logs in a list.""" + from aircox.models import Log + + path = self.get_path(station, date) + + if not os.path.exists(path): + return [] + + with gzip.open(path, "rb") as archive: + data = archive.read() + logs = yaml.load(data) + + # we need to preload diffusions, sounds and tracks + rels = { + "diffusion": self.get_relations(logs, Diffusion, "diffusion"), + "sound": self.get_relations(logs, Sound, "sound"), + "track": self.get_relations(logs, Track, "track"), + } + + def rel_obj(log, attr): + rel_id = log.get(attr + "_id") + return rels[attr][rel_id] if rel_id else None + + return [ + Log( + diffusion=rel_obj(log, "diffusion"), + sound=rel_obj(log, "sound"), + track=rel_obj(log, "track"), + **log + ) + for log in logs + ] + + @staticmethod + def get_relations(logs, model, attr): + """From a list of dict representing logs, retrieve related objects of + the given type.""" + attr_id = attr + "_id" + pks = (log[attr_id] for log in logs if attr_id in log) + return {rel.pk: rel for rel in model.objects.filter(pk__in=pks)} diff --git a/aircox/controllers/playlist_import.py b/aircox/controllers/playlist_import.py new file mode 100644 index 0000000..d0d7441 --- /dev/null +++ b/aircox/controllers/playlist_import.py @@ -0,0 +1,117 @@ +import csv +import logging +import os + + +from aircox.conf import settings +from aircox.models import Track + + +__all__ = ("PlaylistImport",) + + +logger = logging.getLogger("aircox.commands") + + +class PlaylistImport: + """Import one or more playlist for the given sound. Attach it to the + provided sound. + + Playlists are in CSV format, where columns are separated with a + '{settings.IMPORT_PLAYLIST_CSV_DELIMITER}'. Text quote is + {settings.IMPORT_PLAYLIST_CSV_TEXT_QUOTE}. + + If 'minutes' or 'seconds' are given, position will be expressed as timed + position, instead of position in playlist. + """ + + path = None + data = None + tracks = None + track_kwargs = {} + + def __init__(self, path=None, **track_kwargs): + self.path = path + self.track_kwargs = track_kwargs + + def reset(self): + self.data = None + self.tracks = None + + def run(self): + self.read() + if self.track_kwargs.get("sound") is not None: + self.make_playlist() + + def read(self): + if not os.path.exists(self.path): + return True + with open(self.path, "r") as file: + logger.info("start reading csv " + self.path) + self.data = list( + csv.DictReader( + ( + row + for row in file + if not ( + row.startswith("#") or row.startswith("\ufeff#") + ) + and row.strip() + ), + fieldnames=settings.IMPORT_PLAYLIST_CSV_COLS, + delimiter=settings.IMPORT_PLAYLIST_CSV_DELIMITER, + quotechar=settings.IMPORT_PLAYLIST_CSV_TEXT_QUOTE, + ) + ) + + def make_playlist(self): + """Make a playlist from the read data, and return it. + + If save is true, save it into the database + """ + if self.track_kwargs.get("sound") is None: + logger.error( + "related track's sound is missing. Skip import of " + + self.path + + "." + ) + return + + maps = settings.IMPORT_PLAYLIST_CSV_COLS + tracks = [] + + logger.info("parse csv file " + self.path) + has_timestamp = ("minutes" or "seconds") in maps + for index, line in enumerate(self.data): + if ("title" or "artist") not in line: + return + try: + timestamp = ( + int(line.get("minutes") or 0) * 60 + + int(line.get("seconds") or 0) + if has_timestamp + else None + ) + + track, created = Track.objects.get_or_create( + title=line.get("title"), + artist=line.get("artist"), + position=index, + **self.track_kwargs + ) + track.timestamp = timestamp + track.info = line.get("info") + tags = line.get("tags") + if tags: + track.tags.add(*tags.lower().split(",")) + except Exception as err: + logger.warning( + "an error occured for track {index}, it may not " + "have been saved: {err}".format(index=index, err=err) + ) + continue + + track.save() + tracks.append(track) + self.tracks = tracks + return tracks diff --git a/aircox/management/sound_file.py b/aircox/controllers/sound_file.py similarity index 99% rename from aircox/management/sound_file.py rename to aircox/controllers/sound_file.py index adba6db..c2db65a 100644 --- a/aircox/management/sound_file.py +++ b/aircox/controllers/sound_file.py @@ -33,7 +33,7 @@ from django.utils.translation import gettext as _ from aircox import utils from aircox.models import Program, Sound, Track -from .commands.import_playlist import PlaylistImport +from .playlist_import import PlaylistImport logger = logging.getLogger("aircox.commands") diff --git a/aircox/controllers/sound_monitor.py b/aircox/controllers/sound_monitor.py new file mode 100644 index 0000000..ba66420 --- /dev/null +++ b/aircox/controllers/sound_monitor.py @@ -0,0 +1,311 @@ +#! /usr/bin/env python3 + +"""Monitor sound files; For each program, check for: + +- new files; +- deleted files; +- differences between files and sound; +- quality of the files; + +It tries to parse the file name to get the date of the diffusion of an +episode and associate the file with it; WNotifye the following format: + yyyymmdd[_n][_][name] + +Where: + 'yyyy' the year Notifyhe episode's diffusion; + 'mm' the month of the episode's difNotifyon; + 'dd' the day of the episode's diffusion; + 'n' the number of the episode (if multiple episodes); + 'name' the title of the sNotify; + + +To check quality of files, call the command sound_quality_check using the +parameters given by the setting SOUND_QUALITY. This script requires +Sox (and soxi). +""" +import atexit +import concurrent.futures as futures +import logging +import time +import os + +# from datetime import datetime, timedelta + +from django.utils.timezone import datetime, timedelta + +from watchdog.observers import Observer +from watchdog.events import PatternMatchingEventHandler + +from aircox.conf import settings +from aircox.models import Sound, Program + +from .sound_file import SoundFile + + +# FIXME: logger should be different in used classes (e.g. "aircox.commands") +# defaulting to logging. +logger = logging.getLogger("aircox.commands") + + +__all__ = ( + "Task", + "CreateTask", + "DeleteTask", + "MoveTask", + "ModifiedTask", + "MonitorHandler", +) + + +class Task: + """Base class used to execute a specific task on file change event. + + Handlers are sent to a multithread pool. + """ + + future = None + """Future that promised the handler's call.""" + log_msg = None + """Log message to display on event happens.""" + timestamp = None + """Last ping timestamp (the event happened).""" + + def __init__(self, logger=logging): + self.ping() + + def ping(self): + """""" + self.timestamp = datetime.now() + + def __call__(self, event, path=None, logger=logging, **kw): + sound_file = SoundFile(path or event.src_path) + if self.log_msg: + msg = self.log_msg.format(event=event, sound_file=sound_file) + logger.info(msg) + + sound_file.sync(**kw) + return sound_file + + +class CreateTask(Task): + log_msg = "Sound file created: {sound_file.path}" + + +class DeleteTask(Task): + log_msg = "Sound file deleted: {sound_file.path}" + + def __call__(self, *args, **kwargs): + kwargs["deleted"] = True + return super().__call__(*args, **kwargs) + + +class MoveTask(Task): + log_msg = "Sound file moved: {event.src_path} -> {event.dest_path}" + + def __call__(self, event, **kw): + print("event", event.src_path, event.dest_path, Sound.objects.all()) + sound = Sound.objects.filter(file=event.src_path).first() + if sound: + print("got sound", event.src_path) + kw["sound"] = sound + kw["path"] = event.src_path + else: + kw["path"] = event.dest_path + return super().__call__(event, **kw) + + +class ModifiedTask(Task): + timeout_delta = timedelta(seconds=30) + log_msg = "Sound file updated: {sound_file.path}" + + def wait(self): + # multiple call of this handler can be done consecutively, we block + # its thread using timeout + # Note: this method may be subject to some race conflicts, but this + # should not be big a real issue. + timeout = self.timestamp + self.timeout_delta + while datetime.now() < timeout: + time.sleep(self.timeout_delta.total_seconds()) + timeout = self.timestamp + self.timeout_delta + + def __call__(self, event, **kw): + self.wait() + return super().__call__(event, **kw) + + +class MonitorHandler(PatternMatchingEventHandler): + """MonitorHandler is used as a Watchdog event handler. + + It uses a multithread pool in order to execute tasks on events. If a + job already exists for this file and event, it pings existing job + without creating a new one. + """ + + pool = None + jobs = {} + + def __init__(self, subdir, pool, **sync_kw): + """ + :param str subdir: sub-directory in program dirs to monitor \ + (SOUND_ARCHIVES_SUBDIR or SOUND_EXCERPTS_SUBDIR); + :param concurrent.futures.Executor pool: pool executing jobs on file + change; + :param **sync_kw: kwargs passed to `SoundFile.sync`; + """ + self.subdir = subdir + self.pool = pool + self.sync_kw = sync_kw + + patterns = [ + "*/{}/*{}".format(self.subdir, ext) + for ext in settings.SOUND_FILE_EXT + ] + super().__init__(patterns=patterns, ignore_directories=True) + + def on_created(self, event): + self._submit(CreateTask(), event, "new", **self.sync_kw) + + def on_deleted(self, event): + self._submit(DeleteTask(), event, "del") + + def on_moved(self, event): + self._submit(MoveTask(), event, "mv", **self.sync_kw) + + def on_modified(self, event): + self._submit(ModifiedTask(), event, "up", **self.sync_kw) + + def _submit(self, handler, event, job_key_prefix, **kwargs): + """Send handler job to pool if not already running. + + Return tuple with running job and boolean indicating if its a + new one. + """ + key = job_key_prefix + ":" + event.src_path + job = self.jobs.get(key) + if job and not job.future.done(): + job.ping() + return job, False + + handler.future = self.pool.submit(handler, event, **kwargs) + self.jobs[key] = handler + + def done(r): + if self.jobs.get(key) is handler: + del self.jobs[key] + + handler.future.add_done_callback(done) + return handler, True + + +class SoundMonitor: + """Monitor for filesystem changes in order to synchronise database and + analyse files.""" + + def report(self, program=None, component=None, logger=logging, *content): + if not component: + logger.info( + "%s: %s", str(program), " ".join([str(c) for c in content]) + ) + else: + logger.info( + "%s, %s: %s", + str(program), + str(component), + " ".join([str(c) for c in content]), + ) + + def scan(self, logger=logging): + """For all programs, scan dirs.""" + logger.info("scan all programs...") + programs = Program.objects.filter() + + dirs = [] + for program in programs: + logger.info("#%d %s", program.id, program.title) + self.scan_for_program( + program, + settings.SOUND_ARCHIVES_SUBDIR, + logger=logger, + type=Sound.TYPE_ARCHIVE, + ) + self.scan_for_program( + program, + settings.SOUND_EXCERPTS_SUBDIR, + logger=logger, + type=Sound.TYPE_EXCERPT, + ) + dirs.append(os.path.join(program.abspath)) + return dirs + + def scan_for_program( + self, program, subdir, logger=logging, **sound_kwargs + ): + """Scan a given directory that is associated to the given program, and + update sounds information.""" + logger.info("- %s/", subdir) + if not program.ensure_dir(subdir): + return + + subdir = os.path.join(program.abspath, subdir) + sounds = [] + + # sounds in directory + for path in os.listdir(subdir): + path = os.path.join(subdir, path) + if not path.endswith(settings.SOUND_FILE_EXT): + continue + + sound_file = SoundFile(path) + sound_file.sync(program=program, **sound_kwargs) + sounds.append(sound_file.sound.pk) + + # sounds in db & unchecked + sounds = Sound.objects.filter(file__startswith=subdir).exclude( + pk__in=sounds + ) + self.check_sounds(sounds, program=program) + + def check_sounds(self, qs, **sync_kwargs): + """Only check for the sound existence or update.""" + # check files + for sound in qs: + if sound.check_on_file(): + SoundFile(sound.file.path).sync(sound=sound, **sync_kwargs) + + def monitor(self, logger=logging): + """Run in monitor mode.""" + with futures.ThreadPoolExecutor() as pool: + archives_handler = MonitorHandler( + settings.SOUND_ARCHIVES_SUBDIR, + pool, + type=Sound.TYPE_ARCHIVE, + logger=logger, + ) + excerpts_handler = MonitorHandler( + settings.SOUND_EXCERPTS_SUBDIR, + pool, + type=Sound.TYPE_EXCERPT, + logger=logger, + ) + + observer = Observer() + observer.schedule( + archives_handler, + settings.PROGRAMS_DIR_ABS, + recursive=True, + ) + observer.schedule( + excerpts_handler, + settings.PROGRAMS_DIR_ABS, + recursive=True, + ) + observer.start() + + def leave(): + observer.stop() + observer.join() + + atexit.register(leave) + + while True: + time.sleep(1) diff --git a/aircox/management/sound_stats.py b/aircox/controllers/sound_stats.py similarity index 100% rename from aircox/management/sound_stats.py rename to aircox/controllers/sound_stats.py diff --git a/aircox/management/commands/diffusions.py b/aircox/management/commands/diffusions.py index d6bce5f..9f4e571 100755 --- a/aircox/management/commands/diffusions.py +++ b/aircox/management/commands/diffusions.py @@ -9,59 +9,13 @@ import logging from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand -from django.db import transaction from django.utils import timezone as tz -from aircox.models import Diffusion, Schedule +from aircox.controllers.diffusions import Diffusions logger = logging.getLogger("aircox.commands") -class Actions: - date = None - - def __init__(self, date): - self.date = date or datetime.date.today() - - def update(self): - episodes, diffusions = [], [] - for schedule in Schedule.objects.filter( - program__active=True, initial__isnull=True - ): - eps, diffs = schedule.diffusions_of_month(self.date) - if eps: - episodes += eps - if diffs: - diffusions += diffs - - logger.info( - "[update] %s: %d episodes, %d diffusions and reruns", - str(schedule), - len(eps), - len(diffs), - ) - - with transaction.atomic(): - logger.info( - "[update] save %d episodes and %d diffusions", - len(episodes), - len(diffusions), - ) - for episode in episodes: - episode.save() - for diffusion in diffusions: - # force episode id's update - diffusion.episode = diffusion.episode - diffusion.save() - - def clean(self): - qs = Diffusion.objects.filter( - type=Diffusion.TYPE_UNCONFIRMED, start__lt=self.date - ) - logger.info("[clean] %d diffusions will be removed", qs.count()) - qs.delete() - - class Command(BaseCommand): help = __doc__ @@ -116,7 +70,7 @@ class Command(BaseCommand): date += tz.timedelta(days=28) date = date.replace(day=1) - actions = Actions(date) + actions = Diffusions(date) if options.get("update"): actions.update() if options.get("clean"): diff --git a/aircox/management/commands/import_playlist.py b/aircox/management/commands/import_playlist.py index 41707f0..8a4c501 100755 --- a/aircox/management/commands/import_playlist.py +++ b/aircox/management/commands/import_playlist.py @@ -9,7 +9,6 @@ The order of the elements is: {settings.IMPORT_PLAYLIST_CSV_COLS} If 'minutes' or 'seconds' are given, position will be expressed as timed position, instead of position in playlist. """ -import csv import logging import os from argparse import RawTextHelpFormatter @@ -17,109 +16,19 @@ from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand from aircox.conf import settings -from aircox.models import Sound, Track +from aircox.models import Sound +from aircox.controllers.playlist_import import PlaylistImport + __doc__ = __doc__.format(settings=settings) -__all__ = ("PlaylistImport", "Command") + +__all__ = ("Command",) logger = logging.getLogger("aircox.commands") -class PlaylistImport: - path = None - data = None - tracks = None - track_kwargs = {} - - def __init__(self, path=None, **track_kwargs): - self.path = path - self.track_kwargs = track_kwargs - - def reset(self): - self.data = None - self.tracks = None - - def run(self): - self.read() - if self.track_kwargs.get("sound") is not None: - self.make_playlist() - - def read(self): - if not os.path.exists(self.path): - return True - with open(self.path, "r") as file: - logger.info("start reading csv " + self.path) - self.data = list( - csv.DictReader( - ( - row - for row in file - if not ( - row.startswith("#") or row.startswith("\ufeff#") - ) - and row.strip() - ), - fieldnames=settings.IMPORT_PLAYLIST_CSV_COLS, - delimiter=settings.IMPORT_PLAYLIST_CSV_DELIMITER, - quotechar=settings.IMPORT_PLAYLIST_CSV_TEXT_QUOTE, - ) - ) - - def make_playlist(self): - """Make a playlist from the read data, and return it. - - If save is true, save it into the database - """ - if self.track_kwargs.get("sound") is None: - logger.error( - "related track's sound is missing. Skip import of " - + self.path - + "." - ) - return - - maps = settings.IMPORT_PLAYLIST_CSV_COLS - tracks = [] - - logger.info("parse csv file " + self.path) - has_timestamp = ("minutes" or "seconds") in maps - for index, line in enumerate(self.data): - if ("title" or "artist") not in line: - return - try: - timestamp = ( - int(line.get("minutes") or 0) * 60 - + int(line.get("seconds") or 0) - if has_timestamp - else None - ) - - track, created = Track.objects.get_or_create( - title=line.get("title"), - artist=line.get("artist"), - position=index, - **self.track_kwargs - ) - track.timestamp = timestamp - track.info = line.get("info") - tags = line.get("tags") - if tags: - track.tags.add(*tags.lower().split(",")) - except Exception as err: - logger.warning( - "an error occured for track {index}, it may not " - "have been saved: {err}".format(index=index, err=err) - ) - continue - - track.save() - tracks.append(track) - self.tracks = tracks - return tracks - - class Command(BaseCommand): help = __doc__ diff --git a/aircox/management/commands/sounds_monitor.py b/aircox/management/commands/sounds_monitor.py index 9990b30..8c87643 100755 --- a/aircox/management/commands/sounds_monitor.py +++ b/aircox/management/commands/sounds_monitor.py @@ -1,4 +1,5 @@ #! /usr/bin/env python3 +# TODO: SoundMonitor class """Monitor sound files; For each program, check for: @@ -23,20 +24,12 @@ To check quality of files, call the command sound_quality_check using the parameters given by the setting SOUND_QUALITY. This script requires Sox (and soxi). """ -import atexit -import concurrent.futures as futures import logging -import os -import time from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand -from watchdog.observers import Observer -from aircox.conf import settings -from aircox.management.sound_file import SoundFile -from aircox.management.sound_monitor import MonitorHandler -from aircox.models import Program, Sound +from aircox.controllers.sound_monitor import SoundMonitor logger = logging.getLogger("aircox.commands") @@ -44,109 +37,6 @@ logger = logging.getLogger("aircox.commands") class Command(BaseCommand): help = __doc__ - def report(self, program=None, component=None, *content): - if not component: - logger.info( - "%s: %s", str(program), " ".join([str(c) for c in content]) - ) - else: - logger.info( - "%s, %s: %s", - str(program), - str(component), - " ".join([str(c) for c in content]), - ) - - def scan(self): - """For all programs, scan dirs.""" - logger.info("scan all programs...") - programs = Program.objects.filter() - - dirs = [] - for program in programs: - logger.info("#%d %s", program.id, program.title) - self.scan_for_program( - program, - settings.SOUND_ARCHIVES_SUBDIR, - type=Sound.TYPE_ARCHIVE, - ) - self.scan_for_program( - program, - settings.SOUND_EXCERPTS_SUBDIR, - type=Sound.TYPE_EXCERPT, - ) - dirs.append(os.path.join(program.abspath)) - return dirs - - def scan_for_program(self, program, subdir, **sound_kwargs): - """Scan a given directory that is associated to the given program, and - update sounds information.""" - logger.info("- %s/", subdir) - if not program.ensure_dir(subdir): - return - - subdir = os.path.join(program.abspath, subdir) - sounds = [] - - # sounds in directory - for path in os.listdir(subdir): - path = os.path.join(subdir, path) - if not path.endswith(settings.SOUND_FILE_EXT): - continue - - sound_file = SoundFile(path) - sound_file.sync(program=program, **sound_kwargs) - sounds.append(sound_file.sound.pk) - - # sounds in db & unchecked - sounds = Sound.objects.filter(file__startswith=subdir).exclude( - pk__in=sounds - ) - self.check_sounds(sounds, program=program) - - def check_sounds(self, qs, **sync_kwargs): - """Only check for the sound existence or update.""" - # check files - for sound in qs: - if sound.check_on_file(): - SoundFile(sound.file.path).sync(sound=sound, **sync_kwargs) - - def monitor(self): - """Run in monitor mode.""" - with futures.ThreadPoolExecutor() as pool: - archives_handler = MonitorHandler( - settings.SOUND_ARCHIVES_SUBDIR, - pool, - type=Sound.TYPE_ARCHIVE, - ) - excerpts_handler = MonitorHandler( - settings.SOUND_EXCERPTS_SUBDIR, - pool, - type=Sound.TYPE_EXCERPT, - ) - - observer = Observer() - observer.schedule( - archives_handler, - settings.PROGRAMS_DIR_ABS, - recursive=True, - ) - observer.schedule( - excerpts_handler, - settings.PROGRAMS_DIR_ABS, - recursive=True, - ) - observer.start() - - def leave(): - observer.stop() - observer.join() - - atexit.register(leave) - - while True: - time.sleep(1) - def add_arguments(self, parser): parser.formatter_class = RawTextHelpFormatter parser.add_argument( @@ -172,6 +62,7 @@ class Command(BaseCommand): ) def handle(self, *args, **options): + SoundMonitor() if options.get("scan"): self.scan() # if options.get('quality_check'): diff --git a/aircox/management/commands/sounds_quality_check.py b/aircox/management/commands/sounds_quality_check.py index 2015ea8..a69814c 100755 --- a/aircox/management/commands/sounds_quality_check.py +++ b/aircox/management/commands/sounds_quality_check.py @@ -4,7 +4,7 @@ from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand, CommandError -from aircox.management.sound_stats import SoundStats, SoxStats +from aircox.controllers.sound_stats import SoundStats, SoxStats logger = logging.getLogger("aircox.commands") diff --git a/aircox/management/sound_monitor.py b/aircox/management/sound_monitor.py deleted file mode 100644 index dde7b26..0000000 --- a/aircox/management/sound_monitor.py +++ /dev/null @@ -1,171 +0,0 @@ -#! /usr/bin/env python3 - -"""Monitor sound files; For each program, check for: - -- new files; -- deleted files; -- differences between files and sound; -- quality of the files; - -It tries to parse the file name to get the date of the diffusion of an -episode and associate the file with it; WNotifye the following format: - yyyymmdd[_n][_][name] - -Where: - 'yyyy' the year Notifyhe episode's diffusion; - 'mm' the month of the episode's difNotifyon; - 'dd' the day of the episode's diffusion; - 'n' the number of the episode (if multiple episodes); - 'name' the title of the sNotify; - - -To check quality of files, call the command sound_quality_check using the -parameters given by the setting SOUND_QUALITY. This script requires -Sox (and soxi). -""" -import logging -import time -from datetime import datetime, timedelta - -from watchdog.events import PatternMatchingEventHandler - -from aircox.conf import settings -from aircox.models import Sound - -from .sound_file import SoundFile - -logger = logging.getLogger("aircox.commands") - - -__all__ = ( - "NotifyHandler", - "CreateHandler", - "DeleteHandler", - "MoveHandler", - "ModifiedHandler", - "MonitorHandler", -) - - -class NotifyHandler: - future = None - log_msg = None - - def __init__(self): - self.timestamp = datetime.now() - - def ping(self): - self.timestamp = datetime.now() - - def __call__(self, event, path=None, **kw): - sound_file = SoundFile(path or event.src_path) - if self.log_msg: - msg = self.log_msg.format(event=event, sound_file=sound_file) - logger.info(msg) - - sound_file.sync(**kw) - return sound_file - - -class CreateHandler(NotifyHandler): - log_msg = "Sound file created: {sound_file.path}" - - -class DeleteHandler(NotifyHandler): - log_msg = "Sound file deleted: {sound_file.path}" - - def __call__(self, *args, **kwargs): - kwargs["deleted"] = True - return super().__call__(*args, **kwargs) - - -class MoveHandler(NotifyHandler): - log_msg = "Sound file moved: {event.src_path} -> {event.dest_path}" - - def __call__(self, event, **kw): - sound = Sound.objects.filter(file=event.src_path) - # FIXME: this is wrong - if sound: - kw["sound"] = sound - kw["path"] = event.src_path - else: - kw["path"] = event.dest_path - return super().__call__(event, **kw) - - -class ModifiedHandler(NotifyHandler): - timeout_delta = timedelta(seconds=30) - log_msg = "Sound file updated: {sound_file.path}" - - def wait(self): - # multiple call of this handler can be done consecutively, we block - # its thread using timeout - # Note: this method may be subject to some race conflicts, but this - # should not be big a real issue. - timeout = self.timestamp + self.timeout_delta - while datetime.now() < timeout: - time.sleep(self.timeout_delta.total_seconds()) - timeout = self.timestamp + self.timeout_delta - - def __call__(self, event, **kw): - self.wait() - return super().__call__(event, **kw) - - -class MonitorHandler(PatternMatchingEventHandler): - """Event handler for watchdog, in order to be used in monitoring.""" - - pool = None - jobs = {} - - def __init__(self, subdir, pool, **sync_kw): - """ - :param str subdir: sub-directory in program dirs to monitor \ - (SOUND_ARCHIVES_SUBDIR or SOUND_EXCERPTS_SUBDIR); - :param concurrent.futures.Executor pool: pool executing jobs on file - change; - :param **sync_kw: kwargs passed to `SoundFile.sync`; - """ - self.subdir = subdir - self.pool = pool - self.sync_kw = sync_kw - - patterns = [ - "*/{}/*{}".format(self.subdir, ext) - for ext in settings.SOUND_FILE_EXT - ] - super().__init__(patterns=patterns, ignore_directories=True) - - def on_created(self, event): - self._submit(CreateHandler(), event, "new", **self.sync_kw) - - def on_deleted(self, event): - self._submit(DeleteHandler(), event, "del") - - def on_moved(self, event): - self._submit(MoveHandler(), event, "mv", **self.sync_kw) - - def on_modified(self, event): - self._submit(ModifiedHandler(), event, "up", **self.sync_kw) - - def _submit(self, handler, event, job_key_prefix, **kwargs): - """Send handler job to pool if not already running. - - Return tuple with running job and boolean indicating if its a - new one. - """ - key = job_key_prefix + ":" + event.src_path - job = self.jobs.get(key) - if job and not job.future.done(): - job.ping() - return job, False - - handler.future = self.pool.submit(handler, event, **kwargs) - self.jobs[key] = handler - - def done(r): - if self.jobs.get(key) is handler: - del self.jobs[key] - - handler.future.add_done_callback(done) - return handler, True diff --git a/aircox/models/__init__.py b/aircox/models/__init__.py index d4034a5..246ca3d 100644 --- a/aircox/models/__init__.py +++ b/aircox/models/__init__.py @@ -2,7 +2,7 @@ from . import signals from .article import Article from .diffusion import Diffusion, DiffusionQuerySet from .episode import Episode -from .log import Log, LogArchiver, LogQuerySet +from .log import Log, LogQuerySet from .page import Category, Comment, NavItem, Page, PageQuerySet, StaticPage from .program import Program, ProgramChildQuerySet, ProgramQuerySet, Stream from .schedule import Schedule @@ -18,7 +18,6 @@ __all__ = ( "DiffusionQuerySet", "Log", "LogQuerySet", - "LogArchiver", "Category", "PageQuerySet", "Page", diff --git a/aircox/models/log.py b/aircox/models/log.py index b918a42..8a8b942 100644 --- a/aircox/models/log.py +++ b/aircox/models/log.py @@ -1,18 +1,11 @@ import datetime -import gzip import logging -import os from collections import deque -import yaml from django.db import models from django.utils import timezone as tz -from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ -from aircox.conf import settings - -__all__ = ("Settings", "settings") from .diffusion import Diffusion from .sound import Sound, Track from .station import Station @@ -20,7 +13,7 @@ from .station import Station logger = logging.getLogger("aircox") -__all__ = ("Log", "LogQuerySet", "LogArchiver") +__all__ = ("Log", "LogQuerySet") class LogQuerySet(models.QuerySet): @@ -240,104 +233,3 @@ class Log(models.Model): self.comment or "", " (" + ", ".join(r) + ")" if r else "", ) - - -class LogArchiver: - """Commodity class used to manage archives of logs.""" - - @cached_property - def fields(self): - return Log._meta.get_fields() - - @staticmethod - def get_path(station, date): - return os.path.join( - settings.LOGS_ARCHIVES_DIR_ABS, - "{}_{}.log.gz".format(date.strftime("%Y%m%d"), station.pk), - ) - - def archive(self, qs, keep=False): - """Archive logs of the given queryset. - - Delete archived logs if not `keep`. Return the count of archived - logs - """ - if not qs.exists(): - return 0 - - os.makedirs(settings.LOGS_ARCHIVES_DIR_ABS, exist_ok=True) - count = qs.count() - logs = self.sort_logs(qs) - - # Note: since we use Yaml, we can just append new logs when file - # exists yet <3 - for (station, date), logs in logs.items(): - path = self.get_path(station, date) - with gzip.open(path, "ab") as archive: - data = yaml.dump( - [self.serialize(line) for line in logs] - ).encode("utf8") - archive.write(data) - - if not keep: - qs.delete() - - return count - - @staticmethod - def sort_logs(qs): - """Sort logs by station and date and return a dict of `{ - (station,date): [logs] }`.""" - qs = qs.order_by("date") - logs = {} - for log in qs: - key = (log.station, log.date) - if key not in logs: - logs[key] = [log] - else: - logs[key].append(log) - return logs - - def serialize(self, log): - """Serialize log.""" - return {i.attname: getattr(log, i.attname) for i in self.fields} - - def load(self, station, date): - """Load an archive returning logs in a list.""" - path = self.get_path(station, date) - - if not os.path.exists(path): - return [] - - with gzip.open(path, "rb") as archive: - data = archive.read() - logs = yaml.load(data) - - # we need to preload diffusions, sounds and tracks - rels = { - "diffusion": self.get_relations(logs, Diffusion, "diffusion"), - "sound": self.get_relations(logs, Sound, "sound"), - "track": self.get_relations(logs, Track, "track"), - } - - def rel_obj(log, attr): - rel_id = log.get(attr + "_id") - return rels[attr][rel_id] if rel_id else None - - return [ - Log( - diffusion=rel_obj(log, "diffusion"), - sound=rel_obj(log, "sound"), - track=rel_obj(log, "track"), - **log - ) - for log in logs - ] - - @staticmethod - def get_relations(logs, model, attr): - """From a list of dict representing logs, retrieve related objects of - the given type.""" - attr_id = attr + "_id" - pks = (log[attr_id] for log in logs if attr_id in log) - return {rel.pk: rel for rel in model.objects.filter(pk__in=pks)} diff --git a/aircox/test.py b/aircox/test.py index 0ce71ae..0b92327 100644 --- a/aircox/test.py +++ b/aircox/test.py @@ -1,7 +1,9 @@ """This module provide test utilities.""" +from collections import namedtuple +import inspect -__all__ = ("interface",) +__all__ = ("interface", "Interface") def interface(obj, funcs): @@ -31,3 +33,185 @@ def interface_wrap(obj, attr, value): return value setattr(obj, attr, wrapper) + return wrapper + + +InterfaceTarget = namedtuple( + "InterfaceTarget", + ["target", "namespace", "key"], + defaults=[("namespace", None), ("key", None)], +) + + +class WrapperMixin: + def __init__(self, target=None, ns=None, ns_attr=None, **kwargs): + self.target = target + if ns: + self.inject(ns, ns_attr) + super().__init__(**kwargs) + + @property + def ns_target(self): + if self.ns and self.ns_attr: + return getattr(self.ns, self.ns_attr, None) + return None + + def inject(self, ns=None, ns_attr=None): + if ns and ns_attr: + ns_target = getattr(ns, ns_attr, None) + if self.target is ns_target: + return + elif self.target is not None: + raise RuntimeError( + "self target already injected. It must be " + "`release` before `inject`." + ) + self.target = ns_target + setattr(ns, ns_attr, self.parent) + elif not ns or not ns_attr: + raise ValueError("ns and ns_attr must be provided together") + self.ns = ns + self.ns_attr = ns_attr + + def release(self): + if self.ns_target is self: + setattr(self.target.namespace, self.target.name, self.target) + self.target = None + + +class SpoofMixin: + traces = None + + def __init__(self, funcs=None, **kwargs): + self.reset(funcs or {}) + super().__init__(**kwargs) + + def reset(self, funcs=None): + self.traces = {} + if funcs is not None: + self.funcs = funcs + + def get_trace(self, name, args=False, kw=False): + """Get a function call parameters. + + :param str name: function name + :param bool args: return positional arguments + :param bool kwargs: return named arguments + :returns either a tuple of args, a dict of kwargs, or a tuple \ + of `(args, kwargs)`. + :raises ValueError: the function has been called multiple time. + """ + trace = self.traces[name] + if isinstance(trace, list): + raise ValueError(f"{name} called multiple times.") + return self._get_trace(trace, args=args, kw=kw) + + def get_traces(self, name, args=False, kw=False): + """Get a tuple of all call parameters. + + Parameters are the same as `get()`. + """ + traces = self.traces[name] + if not isinstance(traces, list): + traces = (traces,) + return tuple( + self._get_trace(trace, args=args, kw=kw) for trace in traces + ) + + def _get_trace(self, trace, args=False, kw=False): + if (args and kw) or (not args and not kw): + return trace + elif args: + return trace[0] + elif isinstance(kw, str): + return trace[1][kw] + return trace[1] + + def call(self, name, args, kw): + """Add call for function of provided name, and return predefined + result.""" + self.add(name, args, kw) + return self.get_result(name, args, kw) + + def add(self, name, args, kw): + """Add call parameters to `self.traces` for the function with the + provided `name`.""" + trace = self.traces.get(name) + if trace is None: + self.traces[name] = (args, kw) + elif isinstance(trace, tuple): + self.traces[name] = [trace, (args, kw)] + else: + trace.append((args, kw)) + + def get_result(self, name, a, kw): + """Get result for the function of the provided `name`. + + :raises KeyError: no registered function with this `name`. + """ + func = self.funcs[name] + if callable(func): + return func(*a, **kw) + return func + + +class InterfaceMeta(SpoofMixin, WrapperMixin): + calls = None + """Calls done.""" + + def __init__(self, parent, **kwargs): + self.parent = parent + super().__init__(**kwargs) + + def __getitem__(self, name): + return self.traces[name] + + +class Interface: + _imeta = None + """This contains a InterfaceMeta instance related to Interface one. + + `_imeta` is used to check tests etc. + """ + + def __init__(self, _target=None, _funcs=None, _imeta_kw=None, **kwargs): + if _imeta_kw is None: + _imeta_kw = {} + if _funcs is not None: + _imeta_kw.setdefault("funcs", _funcs) + if _target is not None: + _imeta_kw.setdefault("target", _target) + self._imeta = InterfaceMeta(self, **_imeta_kw) + self.__dict__.update(kwargs) + + @property + def _itarget(self): + return self._imeta.target + + @classmethod + def inject(cls, ns, ns_attr, funcs=None, **kwargs): + kwargs["_imeta_kw"] = {"ns": ns, "ns_attr": ns_attr, "funcs": funcs} + return cls(**kwargs) + + def _irelease(self): + self._imeta.release() + + def _trace(self, *args, **kw): + return self._imeta.get_trace(*args, **kw) + + def _traces(self, *args, **kw): + return self._imeta.get_traces(*args, **kw) + + def __call__(self, *args, **kwargs): + target = self._imeta.target + if inspect.isclass(target): + target = target(*args, **kwargs) + return type(self)(target, _imeta_kw={"funcs": self._imeta.funcs}) + + self._imeta.add("__call__", args, kwargs) + return self._imeta.target(*args, **kwargs) + + def __getattr__(self, attr): + if attr in self._imeta.funcs: + return lambda *args, **kwargs: self._imeta.call(attr, args, kwargs) + return getattr(self._imeta.target, attr) diff --git a/aircox/tests/conftest.py b/aircox/tests/conftest.py index bb93cca..7e6833b 100644 --- a/aircox/tests/conftest.py +++ b/aircox/tests/conftest.py @@ -100,3 +100,8 @@ def podcasts(episodes): sound.file = f"test_sound_{episode.pk}_{i}.mp3" items += sounds return items + + +@pytest.fixture +def sound(program): + return baker.make(models.Sound, file="tmp/test.wav", program=program) diff --git a/aircox/tests/controllers/test_sound_monitor.py b/aircox/tests/controllers/test_sound_monitor.py new file mode 100644 index 0000000..ec6e70a --- /dev/null +++ b/aircox/tests/controllers/test_sound_monitor.py @@ -0,0 +1,136 @@ +import logging +import pytest + +from django.utils import timezone as tz + +from aircox.models import Sound +from aircox.controllers import sound_monitor +from aircox.test import Interface + + +now = tz.datetime.now() + + +@pytest.fixture +def event(): + return Interface(src_path="/tmp/src_path", dest_path="/tmp/dest_path") + + +@pytest.fixture +def logger(): + logger = Interface( + logging, {"info": None, "debug": None, "error": None, "warning": None} + ) + print("logger", logger) + return logger + + +@pytest.fixture +def interfaces(logger): + return { + "SoundFile": Interface.inject( + sound_monitor, + "SoundFile", + { + "sync": None, + }, + ), + "time": Interface.inject( + sound_monitor, + "time", + { + "sleep": None, + }, + ), + "datetime": Interface.inject(sound_monitor, "datetime", {now: now}), + } + + +@pytest.fixture +def task(interfaces): + return sound_monitor.Task() + + +@pytest.fixture +def delete_task(interfaces): + return sound_monitor.DeleteTask() + + +@pytest.fixture +def move_task(interfaces): + return sound_monitor.MoveTask() + + +@pytest.fixture +def modified_task(interfaces): + return sound_monitor.ModifiedTask() + + +class TestTask: + def test___init__(self, task): + assert task.timestamp is not None + + def test_ping(self, task): + task.timestamp = None + task.ping() + print("---", task.timestamp, now) + assert task.timestamp >= now + + @pytest.mark.django_db + def test___call__(self, logger, task, event): + task.log_msg = "--{event.src_path}--" + sound_file = task(event, logger=logger, kw=13) + assert sound_file._trace("sync", kw=True) == {"kw": 13} + assert logger._trace("info", args=True) == ( + task.log_msg.format(event=event), + ) + + +class TestDeleteTask: + @pytest.mark.django_db + def test___call__(self, delete_task, logger, task, event): + sound_file = delete_task(event, logger=logger) + assert sound_file._trace("sync", kw=True) == {"deleted": True} + + +class TestMoveTask: + @pytest.mark.django_db + def test__call___with_sound(self, move_task, sound, event, logger): + event.src_path = sound.file.name + sound_file = move_task(event, logger=logger) + assert isinstance(sound_file._trace("sync", kw="sound"), Sound) + assert sound_file.path == sound.file.name + + @pytest.mark.django_db + def test__call___no_sound(self, move_task, event, logger): + sound_file = move_task(event, logger=logger) + assert sound_file._trace("sync", kw=True) == {} + assert sound_file.path == event.dest_path + + +class TestModifiedTask: + def test_wait(self, modified_task): + dt_now = now + modified_task.timeout_delta - tz.timedelta(hours=10) + datetime = Interface.inject(sound_monitor, "datetime", {"now": dt_now}) + + def sleep(n): + datetime._imeta.funcs[ + "now" + ] = modified_task.timestamp + tz.timedelta(hours=10) + + time = Interface.inject(sound_monitor, "time", {"sleep": sleep}) + modified_task.wait() + assert time._trace("sleep", args=True) + + datetime._imeta.release() + + def test__call__(self): + pass + + +class TestMonitorHandler: + pass + + +class SoundMonitor: + pass diff --git a/aircox/tests/management/test_sound_monitor.py b/aircox/tests/management/_test_sound_monitor.py similarity index 100% rename from aircox/tests/management/test_sound_monitor.py rename to aircox/tests/management/_test_sound_monitor.py diff --git a/aircox/tests/management/test_sound_file.py b/aircox/tests/management/test_sound_file.py index 0f855dd..b06cbc8 100644 --- a/aircox/tests/management/test_sound_file.py +++ b/aircox/tests/management/test_sound_file.py @@ -6,7 +6,7 @@ from django.conf import settings as conf from django.utils import timezone as tz from aircox import models -from aircox.management.sound_file import SoundFile +from aircox.controllers.sound_file import SoundFile @pytest.fixture diff --git a/aircox/tests/models/test_episode.py b/aircox/tests/models/test_episode.py new file mode 100644 index 0000000..5c38610 --- /dev/null +++ b/aircox/tests/models/test_episode.py @@ -0,0 +1,64 @@ +import datetime +import pytest + +from aircox.conf import settings +from aircox import models + + +class TestEpisode: + @pytest.mark.django_db + def test_program(self, episode): + assert episode.program == episode.parent.program + + @pytest.mark.django_db + def test_podcasts(self, episode, podcasts): + podcasts = { + podcast.pk: podcast + for podcast in podcasts + if podcast.episode == episode + } + for data in episode.podcasts: + podcast = podcasts[data["pk"]] + assert data["name"] == podcast.name + assert data["page_url"] == episode.get_absolute_url() + assert data["page_title"] == episode.title + + @pytest.mark.django_db + def test_get_absolute_url_is_published(self, episode): + episode.status = models.Episode.STATUS_PUBLISHED + assert episode.get_absolute_url() != episode.parent.get_absolute_url() + + @pytest.mark.django_db + def test_get_absolute_url_not_published(self, episode): + episode.status = models.Episode.STATUS_DRAFT + assert episode.get_absolute_url() == episode.parent.get_absolute_url() + + @pytest.mark.django_db(transaction=False) + def test_save_raises_parent_is_none(self, episode): + with pytest.raises(ValueError): + episode.parent = None + episode.save() + + @pytest.mark.django_db + def test_get_default_title(self, programs): + program = programs[0] + date = datetime.date(2023, 5, 17) + result = models.Episode.get_default_title(program, date) + assert program.title in result + assert date.strftime(settings.EPISODE_TITLE_DATE_FORMAT) in result + + @pytest.mark.django_db + def test_get_init_kwargs_from(self, program): + date = datetime.date(2023, 3, 14) + date_str = date.strftime(settings.EPISODE_TITLE_DATE_FORMAT) + + kwargs = models.Episode.get_init_kwargs_from(program, date) + title = kwargs["title"] + assert program.title in title + assert date_str in title + + @pytest.mark.django_db + def test_get_init_kwargs_from_with_title(self, program): + title = "test title" + kwargs = models.Episode.get_init_kwargs_from(program, title=title) + assert title == kwargs["title"] diff --git a/aircox/views/admin.py b/aircox/views/admin.py index 31772b9..a29aad3 100644 --- a/aircox/views/admin.py +++ b/aircox/views/admin.py @@ -3,7 +3,7 @@ from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin from django.utils.translation import gettext_lazy as _ from django.views.generic import ListView -from ..models.log import LogArchiver +from aircox.controllers.log_archiver import LogArchiver from .log import LogListView __all__ = ["AdminMixin", "StatisticsView"]