add sound_quality_check

This commit is contained in:
bkfox 2015-10-23 17:37:44 +02:00
parent 474a33cfa5
commit af1cbcda46
2 changed files with 216 additions and 21 deletions

View File

@ -0,0 +1,173 @@
"""
Analyse and check files using Sox, prints good and bad files.
"""
import sys
import re
import subprocess
from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand, CommandError
class Stats:
attributes = [
'DC offset', 'Min level', 'Max level',
'Pk lev dB', 'RMS lev dB', 'RMS Pk dB',
'RMS Tr dB', 'Flat factor', 'Length s',
]
def __init__ (self, path, **kwargs):
"""
If path is given, call analyse with path and kwargs
"""
self.values = {}
if path:
self.analyse(path, **kwargs)
def get (self, attr):
return self.values.get(attr)
def parse (self, output):
for attr in Stats.attributes:
value = re.search(attr + r'\s+(?P<value>\S+)', output)
value = value and value.groupdict()
if value:
try:
value = float(value.get('value'))
except ValueError:
value = None
self.values[attr] = value
self.values['length'] = self.values['Length s']
def analyse (self, path, at = None, length = None):
"""
If at and length are given use them as excerpt to analyse.
"""
args = ['sox', path, '-n']
if at is not None and length is not None:
args += ['trim', str(at), str(length) ]
args.append('stats')
p = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr = subprocess.PIPE)
# sox outputs to stderr (my god WHYYYY)
out_, out = p.communicate()
self.parse(str(out, encoding='utf-8'))
class Sound:
path = None # file path
sample_length = 120 # default sample length in seconds
stats = None # list of samples statistics
bad = None # list of bad samples
good = None # list of good samples
def __init__ (self, path, sample_length = None):
self.path = path
self.sample_length = sample_length or self.sample_length
def analyse (self):
print('- Complete file analysis')
self.stats = [ Stats(self.path) ]
position = 0
length = self.stats[0].get('length')
if not self.sample_length:
return
print('- Samples analysis: ', end=' ')
while position < length:
print(len(self.stats), end=' ')
stats = Stats(self.path, at = position, length = self.sample_length)
self.stats.append(stats)
position += self.sample_length
print()
def resume (self):
view = lambda array: [
'file' if index is 0 else
'sample {} (at {} seconds)'.format(index, (index-1) * self.sample_length)
for index in self.good
]
if self.good:
print('- Good:\033[92m', ', '.join( view(self.good) ), '\033[0m')
if self.bad:
print('- Bad:\033[91m', ', '.join( view(self.bad) ), '\033[0m')
def check (self, name, min_val, max_val):
self.good = [ index for index, stats in enumerate(self.stats)
if min_val <= stats.get(name) <= max_val ]
self.bad = [ index for index, stats in enumerate(self.stats)
if index not in self.good ]
self.resume()
class Command (BaseCommand):
help = __doc__
sounds = None
def add_arguments (self, parser):
parser.formatter_class=RawTextHelpFormatter
parser.add_argument(
'files', metavar='FILE', type=str, nargs='+',
help='file(s) to analyse'
)
parser.add_argument(
'-s', '--sample_length', type=int,
help='size of sample to analyse in seconds. If not set (or 0), does'
' not analyse by sample',
)
parser.add_argument(
'-a', '--attribute', type=str,
help='attribute name to use to check, that can be:\n' + \
', '.join([ '"{}"'.format(attr) for attr in Stats.attributes ])
)
parser.add_argument(
'-r', '--range', type=float, nargs=2,
help='range of minimal and maximal accepted value such as: ' \
'--range min max'
)
parser.add_argument(
'-i', '--resume', action='store_true',
help='print a resume of good and bad files'
)
def handle (self, *args, **options):
# parameters
minmax = options.get('range')
if not minmax:
raise CommandError('no range specified')
attr = options.get('attribute')
if not attr:
raise CommandError('no attribute specified')
# sound analyse and checks
self.sounds = [ Sound(path, options.get('sample_length'))
for path in options.get('files') ]
self.bad = []
self.good = []
for sound in self.sounds:
print(sound.path)
sound.analyse()
sound.check(attr, minmax[0], minmax[1])
print()
if sound.bad:
self.bad.append(sound)
else:
self.good.append(sound)
# resume
if options.get('resume'):
if good:
print('Files that did not failed the test:\033[92m\n ',
'\n '.join(good), '\033[0m')
if bad:
# bad at the end for ergonomy
print('Files that failed the test:\033[91m\n ',
'\n '.join(bad),'\033[0m')

View File

@ -1,6 +1,9 @@
""" """
Check over programs' sound files, scan them, and add them to the Monitor sound files; For each program, check for:
database if they are not there yet. - new files;
- deleted files;
- differences between files and sound;
- quality of the files;
It tries to parse the file name to get the date of the diffusion of an It tries to parse the file name to get the date of the diffusion of an
episode and associate the file with it; We use the following format: episode and associate the file with it; We use the following format:
@ -12,16 +15,20 @@ Where:
'dd' is the day of the episode's diffusion; 'dd' is the day of the episode's diffusion;
'n' is the number of the episode (if multiple episodes); 'n' is the number of the episode (if multiple episodes);
'title' the title of the sound; 'title' the title of the sound;
"""
To check quality of files, call the command sound_quality_check using the
parameters given by the setting AIRCOX_SOUND_QUALITY.
"""
import os import os
import re import re
from argparse import RawTextHelpFormatter from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone from django.utils import timezone
from aircox_programs.models import *
import aircox_programs.settings as settings from aircox_programs.models import *
import aircox_programs.settings as settings
class Command (BaseCommand): class Command (BaseCommand):
@ -40,9 +47,17 @@ class Command (BaseCommand):
programs = Program.objects.filter() programs = Program.objects.filter()
for program in programs: for program in programs:
self.check(program, program.path + '/public', public = True) path = lambda x: os.path.join(program.path, x)
self.check(program, program.path + '/podcasts', embed = True) self.check_files(
self.check(program, program.path + '/private') program, path(settings.AIRCOX_SOUND_ARCHIVES_SUBDIR),
archive = True,
)
self.check_files(
program, path(settings.AIRCOX_SOUND_EXCERPTS_SUBDIR),
excerpt = True,
)
self.check_quality()
def get_sound_info (self, path): def get_sound_info (self, path):
""" """
@ -97,28 +112,24 @@ class Command (BaseCommand):
return diffusion.episode or None return diffusion.episode or None
def check (self, program, dir_path, public = False, embed = False): def check_files (self, program, dir_path, **sound_kwargs):
""" """
Scan a given directory that is associated to the given program, and Scan a given directory that is associated to the given program, and
update sounds information update sounds information.
Return a list of scanned sounds
""" """
if not os.path.exists(dir_path): if not os.path.exists(dir_path):
return return
paths = [] # new/existing sounds
for path in os.listdir(dir_path): for path in os.listdir(dir_path):
path = dir_path + '/' + path path = dir_path + '/' + path
if not path.endswith(settings.AIRCOX_SOUNDFILE_EXT): if not path.endswith(settings.AIRCOX_SOUNDFILE_EXT):
continue continue
paths.append(path)
sound_info = self.get_sound_info(path) sound_info = self.get_sound_info(path)
sound = self.ensure_sound(sound_info) sound = self.ensure_sound(sound_info)
sound.__dict__.update(sound_kwargs)
sound.public = public sound.save(check = False)
# episode and relation # episode and relation
if 'year' in sound_info: if 'year' in sound_info:
@ -128,9 +139,20 @@ class Command (BaseCommand):
if sound_.path == sound.path: if sound_.path == sound.path:
break break
else: else:
self.report(program, path, 'associate sound to episode ', self.report(program, path, 'add sound to episode ',
episode.id) episode.id)
episode.sounds.add(sound) episode.sounds.add(sound)
return paths episode.save()
# check files
for sound in Sound.object.filter(path__startswith = path):
if sound.check():
sound.save(check = False)
def check_quality (self):
"""
Check all files where quality has been set to bad
"""