"""Provide sound analysis class using Sox.""" import logging import re import subprocess logger = logging.getLogger("aircox.commands") __all__ = ("SoxStats", "SoundStats") class SoxStats: """Run Sox process and parse output.""" attributes = [ "DC offset", "Min level", "Max level", "Pk lev dB", "RMS lev dB", "RMS Pk dB", "RMS Tr dB", "Flat factor", "Length s", ] values = None def __init__(self, path=None, **kwargs): """If path is given, call analyse with path and kwargs.""" if path: self.analyse(path, **kwargs) def analyse(self, path, at=None, length=None): """If at and length are given use them as excerpt to analyse.""" args = ["sox", path, "-n"] if at is not None and length is not None: args += ["trim", str(at), str(length)] args.append("stats") p = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # sox outputs to stderr (my god WHYYYY) out_, out = p.communicate() self.values = self.parse(str(out, encoding="utf-8")) def parse(self, output): """Parse sox output, settubg values from it.""" values = {} for attr in self.attributes: value = re.search(attr + r"\s+(?P\S+)", output) value = value and value.groupdict() if value: try: value = float(value.get("value")) except ValueError: value = None values[attr] = value values["length"] = values.pop("Length s", None) return values def get(self, attr): return self.values.get(attr) class SoundStats: path = None # file path sample_length = 120 # default sample length in seconds stats = None # list of samples statistics bad = None # list of bad samples good = None # list of good samples def __init__(self, path, sample_length=None): self.path = path if sample_length is not None: self.sample_length = sample_length def get_file_stats(self): return self.stats and self.stats[0] or None def analyse(self): logger.debug("complete file analysis") self.stats = [SoxStats(self.path)] position = 0 length = self.stats[0].get("length") if not self.sample_length: return logger.debug("start samples analysis...") while position < length: stats = SoxStats(self.path, at=position, length=self.sample_length) self.stats.append(stats) position += self.sample_length def check(self, name, min_val, max_val): self.good = [ index for index, stats in enumerate(self.stats) if min_val <= stats.get(name) <= max_val ] self.bad = [ index for index, stats in enumerate(self.stats) if index not in self.good ] self.resume() def resume(self): if self.good: logger.debug( self.path + " -> good: \033[92m%s\033[0m", ", ".join(self._view(self.good)), ) if self.bad: logger.debug( self.path + " -> bad: \033[91m%s\033[0m", ", ".join(self._view(self.bad)), ) def _view(self, array): return [ "file" if index == 0 else "sample {} (at {} seconds)".format( index, (index - 1) * self.sample_length ) for index in array ]