aircox-radiocampus/aircox/controllers/sound_stats.py
Thomas Kairos f7a61fe6c0 Feat: packaging (#127)
- Add configuration files for packaging
- Precommit now uses ruff

Co-authored-by: bkfox <thomas bkfox net>
Reviewed-on: rc/aircox#127
2023-10-11 10:58:34 +02:00

116 lines
3.5 KiB
Python

"""Provide sound analysis class using Sox."""
import logging
import re
import subprocess
logger = logging.getLogger("aircox.commands")
__all__ = ("SoxStats", "SoundStats")
class SoxStats:
"""Run Sox process and parse output."""
attributes = [
"DC offset",
"Min level",
"Max level",
"Pk lev dB",
"RMS lev dB",
"RMS Pk dB",
"RMS Tr dB",
"Flat factor",
"Length s",
]
values = None
def __init__(self, path=None, **kwargs):
"""If path is given, call analyse with path and kwargs."""
if path:
self.analyse(path, **kwargs)
def analyse(self, path, at=None, length=None):
"""If at and length are given use them as excerpt to analyse."""
args = ["sox", path, "-n"]
if at is not None and length is not None:
args += ["trim", str(at), str(length)]
args.append("stats")
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# sox outputs to stderr (my god WHYYYY)
out_, out = p.communicate()
self.values = self.parse(str(out, encoding="utf-8"))
def parse(self, output):
"""Parse sox output, settubg values from it."""
values = {}
for attr in self.attributes:
value = re.search(attr + r"\s+(?P<value>\S+)", output)
value = value and value.groupdict()
if value:
try:
value = float(value.get("value"))
except ValueError:
value = None
values[attr] = value
values["length"] = values.pop("Length s", None)
return values
def get(self, attr):
return self.values.get(attr)
class SoundStats:
path = None # file path
sample_length = 120 # default sample length in seconds
stats = None # list of samples statistics
bad = None # list of bad samples
good = None # list of good samples
def __init__(self, path, sample_length=None):
self.path = path
if sample_length is not None:
self.sample_length = sample_length
def get_file_stats(self):
return self.stats and self.stats[0] or None
def analyse(self):
logger.debug("complete file analysis")
self.stats = [SoxStats(self.path)]
position = 0
length = self.stats[0].get("length")
if not self.sample_length:
return
logger.debug("start samples analysis...")
while position < length:
stats = SoxStats(self.path, at=position, length=self.sample_length)
self.stats.append(stats)
position += self.sample_length
def check(self, name, min_val, max_val):
self.good = [index for index, stats in enumerate(self.stats) if min_val <= stats.get(name) <= max_val]
self.bad = [index for index, stats in enumerate(self.stats) if index not in self.good]
self.resume()
def resume(self):
if self.good:
logger.debug(
self.path + " -> good: \033[92m%s\033[0m",
", ".join(self._view(self.good)),
)
if self.bad:
logger.debug(
self.path + " -> bad: \033[91m%s\033[0m",
", ".join(self._view(self.bad)),
)
def _view(self, array):
return [
"file" if index == 0 else "sample {} (at {} seconds)".format(index, (index - 1) * self.sample_length)
for index in array
]