#! /usr/bin/env python3 """Monitor sound files; For each program, check for: - new files; - deleted files; - differences between files and sound; - quality of the files; It tries to parse the file name to get the date of the diffusion of an episode and associate the file with it; WNotifye the following format: yyyymmdd[_n][_][name] Where: 'yyyy' the year Notifyhe episode's diffusion; 'mm' the month of the episode's difNotifyon; 'dd' the day of the episode's diffusion; 'n' the number of the episode (if multiple episodes); 'name' the title of the sNotify; To check quality of files, call the command sound_quality_check using the parameters given by the setting SOUND_QUALITY. This script requires Sox (and soxi). """ import atexit from concurrent import futures import logging import time import os # from datetime import datetime, timedelta from django.utils.timezone import datetime, timedelta from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler from aircox.conf import settings from aircox.models import Sound, Program from .sound_file import SoundFile # FIXME: logger should be different in used classes (e.g. "aircox.commands") # defaulting to logging. logger = logging.getLogger("aircox.commands") __all__ = ( "Task", "CreateTask", "DeleteTask", "MoveTask", "ModifiedTask", "MonitorHandler", ) class Task: """Base class used to execute a specific task on file change event. Handlers are sent to a multithread pool. """ future = None """Future that promised the handler's call.""" log_msg = None """Log message to display on event happens.""" timestamp = None """Last ping timestamp (the event happened).""" def __init__(self, logger=logging): self.ping() def ping(self): """""" self.timestamp = datetime.now() def __call__(self, event, path=None, logger=logging, **kw): sound_file = SoundFile(path or event.src_path) if self.log_msg: msg = self.log_msg.format(event=event, sound_file=sound_file) logger.info(msg) sound_file.sync(**kw) return sound_file class CreateTask(Task): log_msg = "Sound file created: {sound_file.path}" class DeleteTask(Task): log_msg = "Sound file deleted: {sound_file.path}" def __call__(self, *args, **kwargs): kwargs["deleted"] = True return super().__call__(*args, **kwargs) class MoveTask(Task): log_msg = "Sound file moved: {event.src_path} -> {event.dest_path}" def __call__(self, event, **kw): sound = Sound.objects.filter(file=event.src_path).first() if sound: kw = {**kw, "sound": sound, "path": event.src_path} else: kw["path"] = event.dest_path return super().__call__(event, **kw) class ModifiedTask(Task): timeout_delta = timedelta(seconds=30) log_msg = "Sound file updated: {sound_file.path}" def wait(self): # multiple call of this handler can be done consecutively, we block # its thread using timeout # Note: this method may be subject to some race conflicts, but this # should not be big a real issue. timeout = self.timestamp + self.timeout_delta while datetime.now() < timeout: time.sleep(self.timeout_delta.total_seconds()) timeout = self.timestamp + self.timeout_delta def __call__(self, event, **kw): self.wait() return super().__call__(event, **kw) class MonitorHandler(PatternMatchingEventHandler): """MonitorHandler is used as a Watchdog event handler. It uses a multithread pool in order to execute tasks on events. If a job already exists for this file and event, it pings existing job without creating a new one. """ pool = None jobs = None def __init__(self, subdir, pool, jobs=None, **sync_kw): """ :param str subdir: sub-directory in program dirs to monitor \ (SOUND_ARCHIVES_SUBDIR or SOUND_EXCERPTS_SUBDIR); :param concurrent.futures.Executor pool: pool executing jobs on file change; :param **sync_kw: kwargs passed to `SoundFile.sync`; """ self.subdir = subdir self.pool = pool self.jobs = jobs or {} self.sync_kw = sync_kw patterns = ["*/{}/*{}".format(self.subdir, ext) for ext in settings.SOUND_FILE_EXT] super().__init__(patterns=patterns, ignore_directories=True) def on_created(self, event): self._submit(CreateTask(), event, "new", **self.sync_kw) def on_deleted(self, event): self._submit(DeleteTask(), event, "del") def on_moved(self, event): self._submit(MoveTask(), event, "mv", **self.sync_kw) def on_modified(self, event): self._submit(ModifiedTask(), event, "up", **self.sync_kw) def _submit(self, handler, event, job_key_prefix, **kwargs): """Send handler job to pool if not already running. Return tuple with running job and boolean indicating if its a new one. """ key = job_key_prefix + ":" + event.src_path job = self.jobs.get(key) if job and not job.future.done(): job.ping() return job, False handler.future = self.pool.submit(handler, event, **kwargs) self.jobs[key] = handler def done(r): if self.jobs.get(key) is handler: del self.jobs[key] handler.future.add_done_callback(done) return handler, True class SoundMonitor: """Monitor for filesystem changes in order to synchronise database and analyse files of a provided program.""" def report(self, program=None, component=None, *content, logger=logging): content = " ".join([str(c) for c in content]) logger.info(f"{program}: {content}" if not component else f"{program}, {component}: {content}") def scan(self, logger=logging): """For all programs, scan dirs. Return scanned directories. """ logger.info("scan all programs...") programs = Program.objects.filter() dirs = [] for program in programs: logger.info(f"#{program.id} {program.title}") self.scan_for_program( program, settings.SOUND_BROADCASTS_SUBDIR, logger=logger, broadcast=True, ) self.scan_for_program( program, settings.SOUND_EXCERPTS_SUBDIR, logger=logger, broadcast=False, ) dirs.append(program.abspath) return dirs def scan_for_program(self, program, subdir, logger=logging, **sound_kwargs): """Scan a given directory that is associated to the given program, and update sounds information.""" logger.info("- %s/", subdir) if not program.ensure_dir(subdir): return abs_subdir = os.path.join(program.abspath, subdir) sounds = [] # sounds in directory for path in os.listdir(abs_subdir): path = os.path.join(abs_subdir, path) if not path.endswith(settings.SOUND_FILE_EXT): continue sound_file = SoundFile(path) sound_file.sync(program=program, **sound_kwargs) sounds.append(sound_file.sound.pk) # sounds in db & unchecked sounds = Sound.objects.filter(file__startswith=program.path).exclude(pk__in=sounds) self.check_sounds(sounds, program=program) def check_sounds(self, qs, **sync_kwargs): """Only check for the sound existence or update.""" # check files for sound in qs: if sound.sync_fs(on_update=True): SoundFile(sound.file.path).sync(sound=sound, **sync_kwargs) _running = False def monitor(self, logger=logging): if self._running: raise RuntimeError("already running") """Run in monitor mode.""" with futures.ThreadPoolExecutor() as pool: archives_handler = MonitorHandler( settings.SOUND_BROADCASTS_SUBDIR, pool, broadcast=True, logger=logger, ) excerpts_handler = MonitorHandler( settings.SOUND_EXCERPTS_SUBDIR, pool, broadcast=False, logger=logger, ) observer = Observer() observer.schedule( archives_handler, settings.PROGRAMS_DIR_ABS, recursive=True, ) observer.schedule( excerpts_handler, settings.PROGRAMS_DIR_ABS, recursive=True, ) observer.start() def leave(): observer.stop() observer.join() atexit.register(leave) self._running = True while self._running: time.sleep(1) leave() atexit.unregister(leave) def stop(self): """Stop monitor() loop.""" self._running = False