130 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			130 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Provide sound analysis class using Sox."""
 | 
						|
import logging
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
 | 
						|
logger = logging.getLogger("aircox.commands")
 | 
						|
 | 
						|
 | 
						|
__all__ = ("SoxStats", "SoundStats")
 | 
						|
 | 
						|
 | 
						|
class SoxStats:
 | 
						|
    """Run Sox process and parse output."""
 | 
						|
 | 
						|
    attributes = [
 | 
						|
        "DC offset",
 | 
						|
        "Min level",
 | 
						|
        "Max level",
 | 
						|
        "Pk lev dB",
 | 
						|
        "RMS lev dB",
 | 
						|
        "RMS Pk dB",
 | 
						|
        "RMS Tr dB",
 | 
						|
        "Flat factor",
 | 
						|
        "Length s",
 | 
						|
    ]
 | 
						|
 | 
						|
    values = None
 | 
						|
 | 
						|
    def __init__(self, path=None, **kwargs):
 | 
						|
        """If path is given, call analyse with path and kwargs."""
 | 
						|
        if path:
 | 
						|
            self.analyse(path, **kwargs)
 | 
						|
 | 
						|
    def analyse(self, path, at=None, length=None):
 | 
						|
        """If at and length are given use them as excerpt to analyse."""
 | 
						|
        args = ["sox", path, "-n"]
 | 
						|
        if at is not None and length is not None:
 | 
						|
            args += ["trim", str(at), str(length)]
 | 
						|
        args.append("stats")
 | 
						|
 | 
						|
        p = subprocess.Popen(
 | 
						|
            args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
 | 
						|
        )
 | 
						|
        # sox outputs to stderr (my god WHYYYY)
 | 
						|
        out_, out = p.communicate()
 | 
						|
        self.values = self.parse(str(out, encoding="utf-8"))
 | 
						|
 | 
						|
    def parse(self, output):
 | 
						|
        """Parse sox output, settubg values from it."""
 | 
						|
        values = {}
 | 
						|
        for attr in self.attributes:
 | 
						|
            value = re.search(attr + r"\s+(?P<value>\S+)", output)
 | 
						|
            value = value and value.groupdict()
 | 
						|
            if value:
 | 
						|
                try:
 | 
						|
                    value = float(value.get("value"))
 | 
						|
                except ValueError:
 | 
						|
                    value = None
 | 
						|
                values[attr] = value
 | 
						|
        values["length"] = values.pop("Length s", None)
 | 
						|
        return values
 | 
						|
 | 
						|
    def get(self, attr):
 | 
						|
        return self.values.get(attr)
 | 
						|
 | 
						|
 | 
						|
class SoundStats:
 | 
						|
    path = None  # file path
 | 
						|
    sample_length = 120  # default sample length in seconds
 | 
						|
    stats = None  # list of samples statistics
 | 
						|
    bad = None  # list of bad samples
 | 
						|
    good = None  # list of good samples
 | 
						|
 | 
						|
    def __init__(self, path, sample_length=None):
 | 
						|
        self.path = path
 | 
						|
        if sample_length is not None:
 | 
						|
            self.sample_length = sample_length
 | 
						|
 | 
						|
    def get_file_stats(self):
 | 
						|
        return self.stats and self.stats[0] or None
 | 
						|
 | 
						|
    def analyse(self):
 | 
						|
        logger.debug("complete file analysis")
 | 
						|
        self.stats = [SoxStats(self.path)]
 | 
						|
        position = 0
 | 
						|
        length = self.stats[0].get("length")
 | 
						|
        if not self.sample_length:
 | 
						|
            return
 | 
						|
 | 
						|
        logger.debug("start samples analysis...")
 | 
						|
        while position < length:
 | 
						|
            stats = SoxStats(self.path, at=position, length=self.sample_length)
 | 
						|
            self.stats.append(stats)
 | 
						|
            position += self.sample_length
 | 
						|
 | 
						|
    def check(self, name, min_val, max_val):
 | 
						|
        self.good = [
 | 
						|
            index
 | 
						|
            for index, stats in enumerate(self.stats)
 | 
						|
            if min_val <= stats.get(name) <= max_val
 | 
						|
        ]
 | 
						|
        self.bad = [
 | 
						|
            index
 | 
						|
            for index, stats in enumerate(self.stats)
 | 
						|
            if index not in self.good
 | 
						|
        ]
 | 
						|
        self.resume()
 | 
						|
 | 
						|
    def resume(self):
 | 
						|
        if self.good:
 | 
						|
            logger.debug(
 | 
						|
                self.path + " -> good: \033[92m%s\033[0m",
 | 
						|
                ", ".join(self._view(self.good)),
 | 
						|
            )
 | 
						|
        if self.bad:
 | 
						|
            logger.debug(
 | 
						|
                self.path + " -> bad: \033[91m%s\033[0m",
 | 
						|
                ", ".join(self._view(self.bad)),
 | 
						|
            )
 | 
						|
 | 
						|
    def _view(self, array):
 | 
						|
        return [
 | 
						|
            "file"
 | 
						|
            if index == 0
 | 
						|
            else "sample {} (at {} seconds)".format(
 | 
						|
                index, (index - 1) * self.sample_length
 | 
						|
            )
 | 
						|
            for index in array
 | 
						|
        ]
 |