forked from rc/aircox
		
	move files
This commit is contained in:
		
							
								
								
									
										0
									
								
								programs/management/commands/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								programs/management/commands/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										0
									
								
								programs/management/commands/_private.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								programs/management/commands/_private.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										158
									
								
								programs/management/commands/diffusions_monitor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										158
									
								
								programs/management/commands/diffusions_monitor.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,158 @@
 | 
			
		||||
"""
 | 
			
		||||
Manage diffusions using schedules, to update, clean up or check diffusions.
 | 
			
		||||
 | 
			
		||||
A generated diffusion can be unconfirmed, that means that the user must confirm
 | 
			
		||||
it by changing its type to "normal". The behaviour is controlled using
 | 
			
		||||
--approval.
 | 
			
		||||
 | 
			
		||||
Different actions are available:
 | 
			
		||||
- "update" is the process that is used to generated them using programs
 | 
			
		||||
schedules for the (given) month.
 | 
			
		||||
 | 
			
		||||
- "clean" will remove all diffusions that are still unconfirmed and have been
 | 
			
		||||
planified before the (given) month.
 | 
			
		||||
 | 
			
		||||
- "check" will remove all diffusions that are unconfirmed and have been planified
 | 
			
		||||
from the (given) month and later.
 | 
			
		||||
"""
 | 
			
		||||
from argparse import RawTextHelpFormatter
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
from django.utils import timezone as tz
 | 
			
		||||
from aircox.programs.models import *
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Actions:
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def __check_conflicts (item, saved_items):
 | 
			
		||||
        """
 | 
			
		||||
        Check for conflicts, and update conflictual
 | 
			
		||||
        items if they have been generated during this
 | 
			
		||||
        update.
 | 
			
		||||
 | 
			
		||||
        Return the number of conflicts
 | 
			
		||||
        """
 | 
			
		||||
        conflicts = item.get_conflicts()
 | 
			
		||||
        if not conflicts:
 | 
			
		||||
            item.type = Diffusion.Type['normal']
 | 
			
		||||
            return 0
 | 
			
		||||
 | 
			
		||||
        item.type = Diffusion.Type['unconfirmed']
 | 
			
		||||
        for conflict in conflicts:
 | 
			
		||||
            if conflict.pk in saved_items and \
 | 
			
		||||
                    conflict.type != Diffusion.Type['unconfirmed']:
 | 
			
		||||
                conflict.type = Diffusion.Type['unconfirmed']
 | 
			
		||||
                conflict.save()
 | 
			
		||||
        return len(conflicts)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def update (cl, date, mode):
 | 
			
		||||
        manual = (mode == 'manual')
 | 
			
		||||
        if not manual:
 | 
			
		||||
            saved_items = set()
 | 
			
		||||
 | 
			
		||||
        count = [0, 0]
 | 
			
		||||
        for schedule in Schedule.objects.filter(program__active = True) \
 | 
			
		||||
                .order_by('initial'):
 | 
			
		||||
            # in order to allow rerun links between diffusions, we save items
 | 
			
		||||
            # by schedule;
 | 
			
		||||
            items = schedule.diffusions_of_month(date, exclude_saved = True)
 | 
			
		||||
            count[0] += len(items)
 | 
			
		||||
 | 
			
		||||
            if manual:
 | 
			
		||||
                Diffusion.objects.bulk_create(items)
 | 
			
		||||
            else:
 | 
			
		||||
                for item in items:
 | 
			
		||||
                    count[1] += cl.__check_conflicts(item, saved_items)
 | 
			
		||||
                    item.save()
 | 
			
		||||
                    saved_items.add(item)
 | 
			
		||||
 | 
			
		||||
            print('> {} new diffusions for schedule #{} ({})'.format(
 | 
			
		||||
                    len(items), schedule.id, str(schedule)
 | 
			
		||||
                 ))
 | 
			
		||||
 | 
			
		||||
        print('total of {} diffusions have been created,'.format(count[0]),
 | 
			
		||||
              'do not forget manual approval' if manual else
 | 
			
		||||
                '{} conflicts found'.format(count[1]))
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def clean (date):
 | 
			
		||||
        qs = Diffusion.objects.filter(type = Diffusion.Type['unconfirmed'],
 | 
			
		||||
                                      date__lt = date)
 | 
			
		||||
        print('{} diffusions will be removed'.format(qs.count()))
 | 
			
		||||
        qs.delete()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def check (date):
 | 
			
		||||
        qs = Diffusion.objects.filter(type = Diffusion.Type['unconfirmed'],
 | 
			
		||||
                                      date__gt = date)
 | 
			
		||||
        items = []
 | 
			
		||||
        for diffusion in qs:
 | 
			
		||||
            schedules = Schedule.objects.filter(program = diffusion.program)
 | 
			
		||||
            for schedule in schedules:
 | 
			
		||||
                if schedule.match(diffusion.date):
 | 
			
		||||
                    break
 | 
			
		||||
            else:
 | 
			
		||||
                print('> #{}: {}'.format(diffusion.pk, str(diffusion)))
 | 
			
		||||
                items.append(diffusion.id)
 | 
			
		||||
 | 
			
		||||
        print('{} diffusions will be removed'.format(len(items)))
 | 
			
		||||
        if len(items):
 | 
			
		||||
            Diffusion.objects.filter(id__in = items).delete()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Command (BaseCommand):
 | 
			
		||||
    help= __doc__
 | 
			
		||||
 | 
			
		||||
    def add_arguments (self, parser):
 | 
			
		||||
        parser.formatter_class=RawTextHelpFormatter
 | 
			
		||||
        now = tz.datetime.today()
 | 
			
		||||
 | 
			
		||||
        group = parser.add_argument_group('action')
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--update', action='store_true',
 | 
			
		||||
            help='generate (unconfirmed) diffusions for the given month. '
 | 
			
		||||
                 'These diffusions must be confirmed manually by changing '
 | 
			
		||||
                 'their type to "normal"')
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--clean', action='store_true',
 | 
			
		||||
            help='remove unconfirmed diffusions older than the given month')
 | 
			
		||||
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--check', action='store_true',
 | 
			
		||||
            help='check future unconfirmed diffusions from the given date '
 | 
			
		||||
                 'agains\'t schedules and remove it if that do not match any '
 | 
			
		||||
                 'schedule')
 | 
			
		||||
 | 
			
		||||
        group = parser.add_argument_group('date')
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--year', type=int, default=now.year,
 | 
			
		||||
            help='used by update, default is today\'s year')
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--month', type=int, default=now.month,
 | 
			
		||||
            help='used by update, default is today\'s month')
 | 
			
		||||
 | 
			
		||||
        group = parser.add_argument_group('mode')
 | 
			
		||||
        group.add_argument(
 | 
			
		||||
            '--approval', type=str, choices=['manual', 'auto'],
 | 
			
		||||
            default='auto',
 | 
			
		||||
            help='manual means that all generated diffusions are unconfirmed, '
 | 
			
		||||
                 'thus must be approved manually; auto confirmes all '
 | 
			
		||||
                 'diffusions except those that conflicts with others'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def handle (self, *args, **options):
 | 
			
		||||
        date = tz.datetime(year = options.get('year'),
 | 
			
		||||
                                 month = options.get('month'),
 | 
			
		||||
                                 day = 1)
 | 
			
		||||
        date = tz.make_aware(date)
 | 
			
		||||
 | 
			
		||||
        if options.get('update'):
 | 
			
		||||
            Actions.update(date, mode = options.get('mode'))
 | 
			
		||||
        elif options.get('clean'):
 | 
			
		||||
            Actions.clean(date)
 | 
			
		||||
        elif options.get('check'):
 | 
			
		||||
            Actions.check(date)
 | 
			
		||||
        else:
 | 
			
		||||
            raise CommandError('no action has been given')
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										228
									
								
								programs/management/commands/sounds_monitor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										228
									
								
								programs/management/commands/sounds_monitor.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,228 @@
 | 
			
		||||
"""
 | 
			
		||||
Monitor sound files; For each program, check for:
 | 
			
		||||
- new files;
 | 
			
		||||
- deleted files;
 | 
			
		||||
- differences between files and sound;
 | 
			
		||||
- quality of the files;
 | 
			
		||||
 | 
			
		||||
It tries to parse the file name to get the date of the diffusion of an
 | 
			
		||||
episode and associate the file with it; We use the following format:
 | 
			
		||||
    yyyymmdd[_n][_][name]
 | 
			
		||||
 | 
			
		||||
Where:
 | 
			
		||||
    'yyyy' the year of the episode's diffusion;
 | 
			
		||||
    'mm' the month of the episode's diffusion;
 | 
			
		||||
    'dd' the day of the episode's diffusion;
 | 
			
		||||
    'n' the number of the episode (if multiple episodes);
 | 
			
		||||
    'name' the title of the sound;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
To check quality of files, call the command sound_quality_check using the
 | 
			
		||||
parameters given by the setting AIRCOX_SOUND_QUALITY. This script requires
 | 
			
		||||
Sox (and soxi).
 | 
			
		||||
"""
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import subprocess
 | 
			
		||||
from argparse import RawTextHelpFormatter
 | 
			
		||||
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
 | 
			
		||||
from aircox.programs.models import *
 | 
			
		||||
import aircox.programs.settings as settings
 | 
			
		||||
import aircox.programs.utils as utils
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Command (BaseCommand):
 | 
			
		||||
    help= __doc__
 | 
			
		||||
 | 
			
		||||
    def report (self, program = None, component = None, *content):
 | 
			
		||||
        if not component:
 | 
			
		||||
            print('{}: '.format(program), *content)
 | 
			
		||||
        else:
 | 
			
		||||
            print('{}, {}: '.format(program, component), *content)
 | 
			
		||||
 | 
			
		||||
    def add_arguments (self, parser):
 | 
			
		||||
        parser.formatter_class=RawTextHelpFormatter
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-q', '--quality_check', action='store_true',
 | 
			
		||||
            help='Enable quality check using sound_quality_check on all ' \
 | 
			
		||||
                 'sounds marqued as not good'
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-s', '--scan', action='store_true',
 | 
			
		||||
            help='Scan programs directories for changes, plus check for a '
 | 
			
		||||
                 ' matching episode on sounds that have not been yet assigned'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def handle (self, *args, **options):
 | 
			
		||||
        if options.get('scan'):
 | 
			
		||||
            self.scan()
 | 
			
		||||
        if options.get('quality_check'):
 | 
			
		||||
            self.check_quality(check = (not options.get('scan')) )
 | 
			
		||||
 | 
			
		||||
    def _get_duration (self, path):
 | 
			
		||||
        p = subprocess.Popen(['soxi', '-D', path], stdout=subprocess.PIPE,
 | 
			
		||||
                             stderr=subprocess.PIPE)
 | 
			
		||||
        out, err = p.communicate()
 | 
			
		||||
        if not err:
 | 
			
		||||
            return utils.seconds_to_time(int(float(out)))
 | 
			
		||||
 | 
			
		||||
    def get_sound_info (self, program, path):
 | 
			
		||||
        """
 | 
			
		||||
        Parse file name to get info on the assumption it has the correct
 | 
			
		||||
        format (given in Command.help)
 | 
			
		||||
        """
 | 
			
		||||
        file_name = os.path.basename(path)
 | 
			
		||||
        file_name = os.path.splitext(file_name)[0]
 | 
			
		||||
        r = re.search('^(?P<year>[0-9]{4})'
 | 
			
		||||
                      '(?P<month>[0-9]{2})'
 | 
			
		||||
                      '(?P<day>[0-9]{2})'
 | 
			
		||||
                      '(_(?P<n>[0-9]+))?'
 | 
			
		||||
                      '_?(?P<name>.*)$',
 | 
			
		||||
                      file_name)
 | 
			
		||||
 | 
			
		||||
        if not (r and r.groupdict()):
 | 
			
		||||
            self.report(program, path, "file path is not correct, use defaults")
 | 
			
		||||
            r = {
 | 
			
		||||
                'name': file_name
 | 
			
		||||
            }
 | 
			
		||||
        else:
 | 
			
		||||
            r = r.groupdict()
 | 
			
		||||
 | 
			
		||||
        r['duration'] = self._get_duration(path)
 | 
			
		||||
        r['name'] = r['name'].replace('_', ' ').capitalize()
 | 
			
		||||
        r['path'] = path
 | 
			
		||||
        return r
 | 
			
		||||
 | 
			
		||||
    def find_initial (self, program, sound_info):
 | 
			
		||||
        """
 | 
			
		||||
        For a given program, and sound path check if there is an initial
 | 
			
		||||
        diffusion to associate to, using the diffusion's date.
 | 
			
		||||
 | 
			
		||||
        If there is no matching episode, return None.
 | 
			
		||||
        """
 | 
			
		||||
        # check on episodes
 | 
			
		||||
        diffusion = Diffusion.objects.filter(
 | 
			
		||||
            program = program,
 | 
			
		||||
            date__year = int(sound_info['year']),
 | 
			
		||||
            date__month = int(sound_info['month']),
 | 
			
		||||
            date__day = int(sound_info['day'])
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not diffusion.count():
 | 
			
		||||
            self.report(program, sound_info['path'],
 | 
			
		||||
                        'no diffusion found for the given date')
 | 
			
		||||
            return
 | 
			
		||||
        return diffusion[0]
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def check_sounds (qs):
 | 
			
		||||
        """
 | 
			
		||||
        Only check for the sound existence or update
 | 
			
		||||
        """
 | 
			
		||||
        # check files
 | 
			
		||||
        for sound in qs:
 | 
			
		||||
            if sound.check_on_file():
 | 
			
		||||
                sound.save(check = False)
 | 
			
		||||
 | 
			
		||||
    def scan (self):
 | 
			
		||||
        """
 | 
			
		||||
        For all programs, scan dirs
 | 
			
		||||
        """
 | 
			
		||||
        print('scan files for all programs...')
 | 
			
		||||
        programs = Program.objects.filter()
 | 
			
		||||
 | 
			
		||||
        for program in programs:
 | 
			
		||||
            print('- program', program.name)
 | 
			
		||||
            self.scan_for_program(
 | 
			
		||||
                program, settings.AIRCOX_SOUND_ARCHIVES_SUBDIR,
 | 
			
		||||
                type = Sound.Type['archive'],
 | 
			
		||||
            )
 | 
			
		||||
            self.scan_for_program(
 | 
			
		||||
                program, settings.AIRCOX_SOUND_EXCERPTS_SUBDIR,
 | 
			
		||||
                type = Sound.Type['excerpt'],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def scan_for_program (self, program, subdir, **sound_kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        Scan a given directory that is associated to the given program, and
 | 
			
		||||
        update sounds information.
 | 
			
		||||
        """
 | 
			
		||||
        print(' - scan files in', subdir)
 | 
			
		||||
        if not program.ensure_dir(subdir):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        subdir = os.path.join(program.path, subdir)
 | 
			
		||||
 | 
			
		||||
        # new/existing sounds
 | 
			
		||||
        for path in os.listdir(subdir):
 | 
			
		||||
            path = os.path.join(subdir, path)
 | 
			
		||||
            if not path.endswith(settings.AIRCOX_SOUND_FILE_EXT):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            sound_info = self.get_sound_info(program, path)
 | 
			
		||||
            sound = Sound.objects.get_or_create(
 | 
			
		||||
                path = path,
 | 
			
		||||
                defaults = { 'name': sound_info['name'],
 | 
			
		||||
                             'duration': sound_info['duration'] or None }
 | 
			
		||||
            )[0]
 | 
			
		||||
            sound.__dict__.update(sound_kwargs)
 | 
			
		||||
            sound.save(check = False)
 | 
			
		||||
 | 
			
		||||
            # initial diffusion association
 | 
			
		||||
            if 'year' in sound_info:
 | 
			
		||||
                initial = self.find_initial(program, sound_info)
 | 
			
		||||
                if initial:
 | 
			
		||||
                    if initial.initial:
 | 
			
		||||
                        # FIXME: allow user to overwrite rerun info?
 | 
			
		||||
                        self.report(program, path,
 | 
			
		||||
                                    'the diffusion must be an initial diffusion')
 | 
			
		||||
                    else:
 | 
			
		||||
                        sound = initial.sounds.get_queryset() \
 | 
			
		||||
                                    .filter(path == sound.path)
 | 
			
		||||
                        if not sound:
 | 
			
		||||
                            self.report(program, path,
 | 
			
		||||
                                        'add sound to diffusion ', initial.id)
 | 
			
		||||
                            initial.sounds.add(sound)
 | 
			
		||||
                            initial.save()
 | 
			
		||||
 | 
			
		||||
        self.check_sounds(Sound.objects.filter(path__startswith = subdir))
 | 
			
		||||
 | 
			
		||||
    def check_quality (self, check = False):
 | 
			
		||||
        """
 | 
			
		||||
        Check all files where quality has been set to bad
 | 
			
		||||
        """
 | 
			
		||||
        import aircox.programs.management.commands.sounds_quality_check \
 | 
			
		||||
                as quality_check
 | 
			
		||||
 | 
			
		||||
        sounds = Sound.objects.filter(good_quality = False)
 | 
			
		||||
        if check:
 | 
			
		||||
            self.check_sounds(sounds)
 | 
			
		||||
            files = [ sound.path for sound in sounds if not sound.removed ]
 | 
			
		||||
        else:
 | 
			
		||||
            files = [ sound.path for sound in sounds.filter(removed = False) ]
 | 
			
		||||
 | 
			
		||||
        print('start quality check...')
 | 
			
		||||
        cmd = quality_check.Command()
 | 
			
		||||
        cmd.handle( files = files,
 | 
			
		||||
                    **settings.AIRCOX_SOUND_QUALITY )
 | 
			
		||||
 | 
			
		||||
        print('- update sounds in database')
 | 
			
		||||
        def update_stats(sound_info, sound):
 | 
			
		||||
            stats = sound_info.get_file_stats()
 | 
			
		||||
            if stats:
 | 
			
		||||
                duration = int(stats.get('length'))
 | 
			
		||||
                sound.duration = utils.seconds_to_time(duration)
 | 
			
		||||
 | 
			
		||||
        for sound_info in cmd.good:
 | 
			
		||||
            sound = Sound.objects.get(path = sound_info.path)
 | 
			
		||||
            sound.good_quality = True
 | 
			
		||||
            update_stats(sound_info, sound)
 | 
			
		||||
            sound.save(check = False)
 | 
			
		||||
 | 
			
		||||
        for sound_info in cmd.bad:
 | 
			
		||||
            sound = Sound.objects.get(path = sound_info.path)
 | 
			
		||||
            update_stats(sound_info, sound)
 | 
			
		||||
            sound.save(check = False)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										175
									
								
								programs/management/commands/sounds_quality_check.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										175
									
								
								programs/management/commands/sounds_quality_check.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,175 @@
 | 
			
		||||
"""
 | 
			
		||||
Analyse and check files using Sox, prints good and bad files.
 | 
			
		||||
"""
 | 
			
		||||
import sys
 | 
			
		||||
import re
 | 
			
		||||
import subprocess
 | 
			
		||||
 | 
			
		||||
from argparse import RawTextHelpFormatter
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
 | 
			
		||||
class Stats:
 | 
			
		||||
    attributes = [
 | 
			
		||||
        'DC offset', 'Min level', 'Max level',
 | 
			
		||||
        'Pk lev dB', 'RMS lev dB', 'RMS Pk dB',
 | 
			
		||||
        'RMS Tr dB', 'Flat factor', 'Length s',
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    def __init__ (self, path, **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        If path is given, call analyse with path and kwargs
 | 
			
		||||
        """
 | 
			
		||||
        self.values = {}
 | 
			
		||||
        if path:
 | 
			
		||||
            self.analyse(path, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def get (self, attr):
 | 
			
		||||
        return self.values.get(attr)
 | 
			
		||||
 | 
			
		||||
    def parse (self, output):
 | 
			
		||||
        for attr in Stats.attributes:
 | 
			
		||||
            value = re.search(attr + r'\s+(?P<value>\S+)', output)
 | 
			
		||||
            value = value and value.groupdict()
 | 
			
		||||
            if value:
 | 
			
		||||
                try:
 | 
			
		||||
                    value = float(value.get('value'))
 | 
			
		||||
                except ValueError:
 | 
			
		||||
                    value = None
 | 
			
		||||
                self.values[attr] = value
 | 
			
		||||
        self.values['length'] = self.values['Length s']
 | 
			
		||||
 | 
			
		||||
    def analyse (self, path, at = None, length = None):
 | 
			
		||||
        """
 | 
			
		||||
        If at and length are given use them as excerpt to analyse.
 | 
			
		||||
        """
 | 
			
		||||
        args = ['sox', path, '-n']
 | 
			
		||||
 | 
			
		||||
        if at is not None and length is not None:
 | 
			
		||||
            args += ['trim', str(at), str(length) ]
 | 
			
		||||
 | 
			
		||||
        args.append('stats')
 | 
			
		||||
 | 
			
		||||
        p = subprocess.Popen(args, stdout=subprocess.PIPE,
 | 
			
		||||
                             stderr=subprocess.PIPE)
 | 
			
		||||
        # sox outputs to stderr (my god WHYYYY)
 | 
			
		||||
        out_, out = p.communicate()
 | 
			
		||||
        self.parse(str(out, encoding='utf-8'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Sound:
 | 
			
		||||
    path = None             # file path
 | 
			
		||||
    sample_length = 120     # default sample length in seconds
 | 
			
		||||
    stats = None            # list of samples statistics
 | 
			
		||||
    bad = None              # list of bad samples
 | 
			
		||||
    good = None             # list of good samples
 | 
			
		||||
 | 
			
		||||
    def __init__ (self, path, sample_length = None):
 | 
			
		||||
        self.path = path
 | 
			
		||||
        self.sample_length = sample_length if sample_length is not None \
 | 
			
		||||
                                else self.sample_length
 | 
			
		||||
 | 
			
		||||
    def get_file_stats (self):
 | 
			
		||||
        return self.stats and self.stats[0]
 | 
			
		||||
 | 
			
		||||
    def analyse (self):
 | 
			
		||||
        print('- complete file analysis')
 | 
			
		||||
        self.stats = [ Stats(self.path) ]
 | 
			
		||||
        position = 0
 | 
			
		||||
        length = self.stats[0].get('length')
 | 
			
		||||
 | 
			
		||||
        if not self.sample_length:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        print('- samples analysis: ', end=' ')
 | 
			
		||||
        while position < length:
 | 
			
		||||
            print(len(self.stats), end=' ')
 | 
			
		||||
            stats = Stats(self.path, at = position, length = self.sample_length)
 | 
			
		||||
            self.stats.append(stats)
 | 
			
		||||
            position += self.sample_length
 | 
			
		||||
        print()
 | 
			
		||||
 | 
			
		||||
    def check (self, name, min_val, max_val):
 | 
			
		||||
        self.good = [ index for index, stats in enumerate(self.stats)
 | 
			
		||||
                      if min_val <= stats.get(name) <= max_val ]
 | 
			
		||||
        self.bad = [ index for index, stats in enumerate(self.stats)
 | 
			
		||||
                      if index not in self.good ]
 | 
			
		||||
        self.resume()
 | 
			
		||||
 | 
			
		||||
    def resume (self):
 | 
			
		||||
        view = lambda array: [
 | 
			
		||||
            'file' if index is 0 else
 | 
			
		||||
            'sample {} (at {} seconds)'.format(index, (index-1) * self.sample_length)
 | 
			
		||||
            for index in array
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        if self.good:
 | 
			
		||||
            print('- good:\033[92m', ', '.join( view(self.good) ), '\033[0m')
 | 
			
		||||
        if self.bad:
 | 
			
		||||
            print('- bad:\033[91m', ', '.join( view(self.bad) ), '\033[0m')
 | 
			
		||||
 | 
			
		||||
class Command (BaseCommand):
 | 
			
		||||
    help = __doc__
 | 
			
		||||
    sounds = None
 | 
			
		||||
 | 
			
		||||
    def add_arguments (self, parser):
 | 
			
		||||
        parser.formatter_class=RawTextHelpFormatter
 | 
			
		||||
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            'files', metavar='FILE', type=str, nargs='+',
 | 
			
		||||
            help='file(s) to analyse'
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-s', '--sample_length', type=int, default=120,
 | 
			
		||||
            help='size of sample to analyse in seconds. If not set (or 0), does'
 | 
			
		||||
                 ' not analyse by sample',
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-a', '--attribute', type=str,
 | 
			
		||||
            help='attribute name to use to check, that can be:\n' + \
 | 
			
		||||
                 ', '.join([ '"{}"'.format(attr) for attr in Stats.attributes ])
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-r', '--range', type=float, nargs=2,
 | 
			
		||||
            help='range of minimal and maximal accepted value such as: ' \
 | 
			
		||||
                 '--range min max'
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            '-i', '--resume', action='store_true',
 | 
			
		||||
            help='print a resume of good and bad files'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def handle (self, *args, **options):
 | 
			
		||||
        # parameters
 | 
			
		||||
        minmax = options.get('range')
 | 
			
		||||
        if not minmax:
 | 
			
		||||
            raise CommandError('no range specified')
 | 
			
		||||
 | 
			
		||||
        attr = options.get('attribute')
 | 
			
		||||
        if not attr:
 | 
			
		||||
            raise CommandError('no attribute specified')
 | 
			
		||||
 | 
			
		||||
        # sound analyse and checks
 | 
			
		||||
        self.sounds = [ Sound(path, options.get('sample_length'))
 | 
			
		||||
                        for path in options.get('files') ]
 | 
			
		||||
        self.bad = []
 | 
			
		||||
        self.good = []
 | 
			
		||||
        for sound in self.sounds:
 | 
			
		||||
            print(sound.path)
 | 
			
		||||
            sound.analyse()
 | 
			
		||||
            sound.check(attr, minmax[0], minmax[1])
 | 
			
		||||
            print()
 | 
			
		||||
            if sound.bad:
 | 
			
		||||
                self.bad.append(sound)
 | 
			
		||||
            else:
 | 
			
		||||
                self.good.append(sound)
 | 
			
		||||
 | 
			
		||||
        # resume
 | 
			
		||||
        if options.get('resume'):
 | 
			
		||||
            if self.good:
 | 
			
		||||
                print('files that did not failed the test:\033[92m\n   ',
 | 
			
		||||
                      '\n    '.join([sound.path for sound in self.good]), '\033[0m')
 | 
			
		||||
            if self.bad:
 | 
			
		||||
                # bad at the end for ergonomy
 | 
			
		||||
                print('files that failed the test:\033[91m\n   ',
 | 
			
		||||
                      '\n    '.join([sound.path for sound in self.bad]),'\033[0m')
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user