116 lines
3.5 KiB
Python
116 lines
3.5 KiB
Python
"""Provide sound analysis class using Sox."""
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
|
|
logger = logging.getLogger("aircox.commands")
|
|
|
|
|
|
__all__ = ("SoxStats", "SoundStats")
|
|
|
|
|
|
class SoxStats:
|
|
"""Run Sox process and parse output."""
|
|
|
|
attributes = [
|
|
"DC offset",
|
|
"Min level",
|
|
"Max level",
|
|
"Pk lev dB",
|
|
"RMS lev dB",
|
|
"RMS Pk dB",
|
|
"RMS Tr dB",
|
|
"Flat factor",
|
|
"Length s",
|
|
]
|
|
|
|
values = None
|
|
|
|
def __init__(self, path=None, **kwargs):
|
|
"""If path is given, call analyse with path and kwargs."""
|
|
if path:
|
|
self.analyse(path, **kwargs)
|
|
|
|
def analyse(self, path, at=None, length=None):
|
|
"""If at and length are given use them as excerpt to analyse."""
|
|
args = ["sox", path, "-n"]
|
|
if at is not None and length is not None:
|
|
args += ["trim", str(at), str(length)]
|
|
args.append("stats")
|
|
|
|
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# sox outputs to stderr (my god WHYYYY)
|
|
out_, out = p.communicate()
|
|
self.values = self.parse(str(out, encoding="utf-8"))
|
|
|
|
def parse(self, output):
|
|
"""Parse sox output, settubg values from it."""
|
|
values = {}
|
|
for attr in self.attributes:
|
|
value = re.search(attr + r"\s+(?P<value>\S+)", output)
|
|
value = value and value.groupdict()
|
|
if value:
|
|
try:
|
|
value = float(value.get("value"))
|
|
except ValueError:
|
|
value = None
|
|
values[attr] = value
|
|
values["length"] = values.pop("Length s", None)
|
|
return values
|
|
|
|
def get(self, attr):
|
|
return self.values.get(attr)
|
|
|
|
|
|
class SoundStats:
|
|
path = None # file path
|
|
sample_length = 120 # default sample length in seconds
|
|
stats = None # list of samples statistics
|
|
bad = None # list of bad samples
|
|
good = None # list of good samples
|
|
|
|
def __init__(self, path, sample_length=None):
|
|
self.path = path
|
|
if sample_length is not None:
|
|
self.sample_length = sample_length
|
|
|
|
def get_file_stats(self):
|
|
return self.stats and self.stats[0] or None
|
|
|
|
def analyse(self):
|
|
logger.debug("complete file analysis")
|
|
self.stats = [SoxStats(self.path)]
|
|
position = 0
|
|
length = self.stats[0].get("length")
|
|
if not self.sample_length:
|
|
return
|
|
|
|
logger.debug("start samples analysis...")
|
|
while position < length:
|
|
stats = SoxStats(self.path, at=position, length=self.sample_length)
|
|
self.stats.append(stats)
|
|
position += self.sample_length
|
|
|
|
def check(self, name, min_val, max_val):
|
|
self.good = [index for index, stats in enumerate(self.stats) if min_val <= stats.get(name) <= max_val]
|
|
self.bad = [index for index, stats in enumerate(self.stats) if index not in self.good]
|
|
self.resume()
|
|
|
|
def resume(self):
|
|
if self.good:
|
|
logger.debug(
|
|
self.path + " -> good: \033[92m%s\033[0m",
|
|
", ".join(self._view(self.good)),
|
|
)
|
|
if self.bad:
|
|
logger.debug(
|
|
self.path + " -> bad: \033[91m%s\033[0m",
|
|
", ".join(self._view(self.bad)),
|
|
)
|
|
|
|
def _view(self, array):
|
|
return [
|
|
"file" if index == 0 else "sample {} (at {} seconds)".format(index, (index - 1) * self.sample_length)
|
|
for index in array
|
|
]
|