From af1cbcda462eb6705fc756edebff9e0a8f9b3777 Mon Sep 17 00:00:00 2001 From: bkfox Date: Fri, 23 Oct 2015 17:37:44 +0200 Subject: [PATCH] add sound_quality_check --- .../commands/sound_quality_check.py | 173 ++++++++++++++++++ .../management/commands/sounds_monitor.py | 64 ++++--- 2 files changed, 216 insertions(+), 21 deletions(-) create mode 100644 aircox_programs/management/commands/sound_quality_check.py diff --git a/aircox_programs/management/commands/sound_quality_check.py b/aircox_programs/management/commands/sound_quality_check.py new file mode 100644 index 0000000..3bd74c4 --- /dev/null +++ b/aircox_programs/management/commands/sound_quality_check.py @@ -0,0 +1,173 @@ +""" +Analyse and check files using Sox, prints good and bad files. +""" +import sys +import re +import subprocess + +from argparse import RawTextHelpFormatter +from django.core.management.base import BaseCommand, CommandError + +class Stats: + attributes = [ + 'DC offset', 'Min level', 'Max level', + 'Pk lev dB', 'RMS lev dB', 'RMS Pk dB', + 'RMS Tr dB', 'Flat factor', 'Length s', + ] + + def __init__ (self, path, **kwargs): + """ + If path is given, call analyse with path and kwargs + """ + self.values = {} + if path: + self.analyse(path, **kwargs) + + def get (self, attr): + return self.values.get(attr) + + def parse (self, output): + for attr in Stats.attributes: + value = re.search(attr + r'\s+(?P\S+)', output) + value = value and value.groupdict() + if value: + try: + value = float(value.get('value')) + except ValueError: + value = None + self.values[attr] = value + self.values['length'] = self.values['Length s'] + + def analyse (self, path, at = None, length = None): + """ + If at and length are given use them as excerpt to analyse. + """ + args = ['sox', path, '-n'] + + if at is not None and length is not None: + args += ['trim', str(at), str(length) ] + + args.append('stats') + + p = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr = subprocess.PIPE) + # sox outputs to stderr (my god WHYYYY) + out_, out = p.communicate() + self.parse(str(out, encoding='utf-8')) + + +class Sound: + path = None # file path + sample_length = 120 # default sample length in seconds + stats = None # list of samples statistics + bad = None # list of bad samples + good = None # list of good samples + + def __init__ (self, path, sample_length = None): + self.path = path + self.sample_length = sample_length or self.sample_length + + def analyse (self): + print('- Complete file analysis') + self.stats = [ Stats(self.path) ] + position = 0 + length = self.stats[0].get('length') + + if not self.sample_length: + return + + print('- Samples analysis: ', end=' ') + while position < length: + print(len(self.stats), end=' ') + stats = Stats(self.path, at = position, length = self.sample_length) + self.stats.append(stats) + position += self.sample_length + print() + + def resume (self): + view = lambda array: [ + 'file' if index is 0 else + 'sample {} (at {} seconds)'.format(index, (index-1) * self.sample_length) + for index in self.good + ] + + if self.good: + print('- Good:\033[92m', ', '.join( view(self.good) ), '\033[0m') + if self.bad: + print('- Bad:\033[91m', ', '.join( view(self.bad) ), '\033[0m') + + def check (self, name, min_val, max_val): + self.good = [ index for index, stats in enumerate(self.stats) + if min_val <= stats.get(name) <= max_val ] + self.bad = [ index for index, stats in enumerate(self.stats) + if index not in self.good ] + self.resume() + + +class Command (BaseCommand): + help = __doc__ + sounds = None + + def add_arguments (self, parser): + parser.formatter_class=RawTextHelpFormatter + + parser.add_argument( + 'files', metavar='FILE', type=str, nargs='+', + help='file(s) to analyse' + ) + parser.add_argument( + '-s', '--sample_length', type=int, + help='size of sample to analyse in seconds. If not set (or 0), does' + ' not analyse by sample', + ) + parser.add_argument( + '-a', '--attribute', type=str, + help='attribute name to use to check, that can be:\n' + \ + ', '.join([ '"{}"'.format(attr) for attr in Stats.attributes ]) + ) + parser.add_argument( + '-r', '--range', type=float, nargs=2, + help='range of minimal and maximal accepted value such as: ' \ + '--range min max' + ) + parser.add_argument( + '-i', '--resume', action='store_true', + help='print a resume of good and bad files' + ) + + def handle (self, *args, **options): + # parameters + minmax = options.get('range') + if not minmax: + raise CommandError('no range specified') + + attr = options.get('attribute') + if not attr: + raise CommandError('no attribute specified') + + # sound analyse and checks + self.sounds = [ Sound(path, options.get('sample_length')) + for path in options.get('files') ] + self.bad = [] + self.good = [] + for sound in self.sounds: + print(sound.path) + sound.analyse() + sound.check(attr, minmax[0], minmax[1]) + print() + if sound.bad: + self.bad.append(sound) + else: + self.good.append(sound) + + # resume + if options.get('resume'): + if good: + print('Files that did not failed the test:\033[92m\n ', + '\n '.join(good), '\033[0m') + if bad: + # bad at the end for ergonomy + print('Files that failed the test:\033[91m\n ', + '\n '.join(bad),'\033[0m') + diff --git a/aircox_programs/management/commands/sounds_monitor.py b/aircox_programs/management/commands/sounds_monitor.py index 3c6098b..64b7bfd 100644 --- a/aircox_programs/management/commands/sounds_monitor.py +++ b/aircox_programs/management/commands/sounds_monitor.py @@ -1,6 +1,9 @@ """ -Check over programs' sound files, scan them, and add them to the -database if they are not there yet. +Monitor sound files; For each program, check for: +- new files; +- deleted files; +- differences between files and sound; +- quality of the files; It tries to parse the file name to get the date of the diffusion of an episode and associate the file with it; We use the following format: @@ -12,16 +15,20 @@ Where: 'dd' is the day of the episode's diffusion; 'n' is the number of the episode (if multiple episodes); 'title' the title of the sound; -""" + +To check quality of files, call the command sound_quality_check using the +parameters given by the setting AIRCOX_SOUND_QUALITY. +""" import os import re from argparse import RawTextHelpFormatter -from django.core.management.base import BaseCommand, CommandError -from django.utils import timezone -from aircox_programs.models import * -import aircox_programs.settings as settings +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone + +from aircox_programs.models import * +import aircox_programs.settings as settings class Command (BaseCommand): @@ -40,9 +47,17 @@ class Command (BaseCommand): programs = Program.objects.filter() for program in programs: - self.check(program, program.path + '/public', public = True) - self.check(program, program.path + '/podcasts', embed = True) - self.check(program, program.path + '/private') + path = lambda x: os.path.join(program.path, x) + self.check_files( + program, path(settings.AIRCOX_SOUND_ARCHIVES_SUBDIR), + archive = True, + ) + self.check_files( + program, path(settings.AIRCOX_SOUND_EXCERPTS_SUBDIR), + excerpt = True, + ) + + self.check_quality() def get_sound_info (self, path): """ @@ -97,28 +112,24 @@ class Command (BaseCommand): return diffusion.episode or None - def check (self, program, dir_path, public = False, embed = False): + def check_files (self, program, dir_path, **sound_kwargs): """ Scan a given directory that is associated to the given program, and - update sounds information - - Return a list of scanned sounds + update sounds information. """ if not os.path.exists(dir_path): return - paths = [] + # new/existing sounds for path in os.listdir(dir_path): path = dir_path + '/' + path if not path.endswith(settings.AIRCOX_SOUNDFILE_EXT): continue - paths.append(path) - sound_info = self.get_sound_info(path) sound = self.ensure_sound(sound_info) - - sound.public = public + sound.__dict__.update(sound_kwargs) + sound.save(check = False) # episode and relation if 'year' in sound_info: @@ -128,9 +139,20 @@ class Command (BaseCommand): if sound_.path == sound.path: break else: - self.report(program, path, 'associate sound to episode ', + self.report(program, path, 'add sound to episode ', episode.id) episode.sounds.add(sound) - return paths + episode.save() + + # check files + for sound in Sound.object.filter(path__startswith = path): + if sound.check(): + sound.save(check = False) + + + def check_quality (self): + """ + Check all files where quality has been set to bad + """