forked from rc/aircox
		
	cfr #121 Co-authored-by: Christophe Siraut <d@tobald.eu.org> Co-authored-by: bkfox <thomas bkfox net> Co-authored-by: Thomas Kairos <thomas@bkfox.net> Reviewed-on: rc/aircox#131 Co-authored-by: Chris Tactic <ctactic@noreply.git.radiocampus.be> Co-committed-by: Chris Tactic <ctactic@noreply.git.radiocampus.be>
		
			
				
	
	
		
			181 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			181 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
import atexit
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import signal
 | 
						|
import subprocess
 | 
						|
 | 
						|
import psutil
 | 
						|
from django.template.loader import render_to_string
 | 
						|
 | 
						|
from ..conf import settings
 | 
						|
from ..connector import Connector
 | 
						|
from .sources import PlaylistSource, QueueSource
 | 
						|
 | 
						|
 | 
						|
__all__ = ("Streamer",)
 | 
						|
 | 
						|
logger = logging.getLogger("aircox")
 | 
						|
 | 
						|
 | 
						|
class Streamer:
 | 
						|
    connector = None
 | 
						|
    process = None
 | 
						|
 | 
						|
    station = None
 | 
						|
    template_name = "aircox_streamer/scripts/station.liq"
 | 
						|
    path = None
 | 
						|
    """Config path."""
 | 
						|
    sources = None
 | 
						|
    """List of all monitored sources."""
 | 
						|
    source = None
 | 
						|
    """Current source being played on air."""
 | 
						|
    # note: we disable on_air rids since we don't have use of it for the
 | 
						|
    # moment
 | 
						|
    # on_air = None
 | 
						|
    # """ On-air request ids (rid) """
 | 
						|
    inputs = None
 | 
						|
    """Queryset to input ports."""
 | 
						|
    outputs = None
 | 
						|
    """Queryset to output ports."""
 | 
						|
 | 
						|
    def __init__(self, station, connector=None):
 | 
						|
        self.station = station
 | 
						|
        self.inputs = self.station.port_set.active().input()
 | 
						|
        self.outputs = self.station.port_set.active().output()
 | 
						|
 | 
						|
        self.id = self.station.slug.replace("-", "_")
 | 
						|
        self.path = settings.get_dir(station, "station.liq")
 | 
						|
        self.connector = connector or Connector(settings.get_dir(station, "station.sock"))
 | 
						|
        self.init_sources()
 | 
						|
 | 
						|
    @property
 | 
						|
    def socket_path(self):
 | 
						|
        """Path to Unix socket file."""
 | 
						|
        return self.connector.address
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_ready(self):
 | 
						|
        """If external program is ready to use, returns True."""
 | 
						|
        return self.send("list") != ""
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_running(self):
 | 
						|
        """True if holds a running process."""
 | 
						|
        if self.process is None:
 | 
						|
            return False
 | 
						|
 | 
						|
        returncode = self.process.poll()
 | 
						|
        if returncode is None:
 | 
						|
            return True
 | 
						|
 | 
						|
        self.process = None
 | 
						|
        logger.debug("process died with return code %s" % returncode)
 | 
						|
        return False
 | 
						|
 | 
						|
    @property
 | 
						|
    def playlists(self):
 | 
						|
        return (s for s in self.sources if isinstance(s, PlaylistSource))
 | 
						|
 | 
						|
    @property
 | 
						|
    def queues(self):
 | 
						|
        return (s for s in self.sources if isinstance(s, QueueSource))
 | 
						|
 | 
						|
    # Sources and config ###############################################
 | 
						|
    def send(self, *args, **kwargs):
 | 
						|
        return self.connector.send(*args, **kwargs) or ""
 | 
						|
 | 
						|
    def init_sources(self):
 | 
						|
        streams = self.station.program_set.filter(stream__isnull=False)
 | 
						|
        self.dealer = QueueSource(self, "dealer")
 | 
						|
        self.sources = [self.dealer] + [PlaylistSource(self, program=program) for program in streams]
 | 
						|
 | 
						|
    def make_config(self):
 | 
						|
        """Make configuration files and directory (and sync sources)"""
 | 
						|
        data = render_to_string(
 | 
						|
            self.template_name,
 | 
						|
            {
 | 
						|
                "dir": settings.get_dir(self.station),
 | 
						|
                "log_file": settings.get_dir(self.station, "liquidsoap.log"),
 | 
						|
                "station": self.station,
 | 
						|
                "streamer": self,
 | 
						|
            },
 | 
						|
        )
 | 
						|
        data = re.sub("[\t ]+\n", "\n", data)
 | 
						|
        data = re.sub("\n{3,}", "\n\n", data)
 | 
						|
 | 
						|
        os.makedirs(os.path.dirname(self.path), exist_ok=True)
 | 
						|
        with open(self.path, "w+") as file:
 | 
						|
            file.write(data)
 | 
						|
 | 
						|
        self.sync()
 | 
						|
 | 
						|
    def sync(self):
 | 
						|
        """Sync all sources."""
 | 
						|
        for source in self.sources:
 | 
						|
            source.sync()
 | 
						|
 | 
						|
    def fetch(self):
 | 
						|
        """Fetch data from liquidsoap."""
 | 
						|
        for source in self.sources:
 | 
						|
            source.fetch()
 | 
						|
 | 
						|
        # request.on_air is not ordered: we need to do it manually
 | 
						|
        self.source = next(
 | 
						|
            iter(
 | 
						|
                sorted(
 | 
						|
                    (source for source in self.sources if source.request_status == "playing" and source.air_time),
 | 
						|
                    key=lambda o: o.air_time,
 | 
						|
                    reverse=True,
 | 
						|
                )
 | 
						|
            ),
 | 
						|
            None,
 | 
						|
        )
 | 
						|
 | 
						|
    # Process ##########################################################
 | 
						|
    def get_process_args(self):
 | 
						|
        return ["liquidsoap", "-v", self.path]
 | 
						|
 | 
						|
    def check_zombie_process(self):
 | 
						|
        if not os.path.exists(self.socket_path):
 | 
						|
            return
 | 
						|
 | 
						|
        conns = [conn for conn in psutil.net_connections(kind="unix") if conn.laddr == self.socket_path]
 | 
						|
        for conn in conns:
 | 
						|
            if conn.pid is not None:
 | 
						|
                os.kill(conn.pid, signal.SIGKILL)
 | 
						|
 | 
						|
    def run_process(self):
 | 
						|
        """Execute the external application with corresponding informations.
 | 
						|
 | 
						|
        This function must make sure that all needed files have been
 | 
						|
        generated.
 | 
						|
        """
 | 
						|
        if self.process:
 | 
						|
            return
 | 
						|
 | 
						|
        args = self.get_process_args()
 | 
						|
        if not args:
 | 
						|
            return
 | 
						|
 | 
						|
        self.check_zombie_process()
 | 
						|
        self.process = subprocess.Popen(args, stderr=subprocess.STDOUT)
 | 
						|
        atexit.register(self.kill_process)
 | 
						|
 | 
						|
    def kill_process(self):
 | 
						|
        if self.process:
 | 
						|
            logger.debug(
 | 
						|
                "kill process %s: %s",
 | 
						|
                self.process.pid,
 | 
						|
                " ".join(self.get_process_args()),
 | 
						|
            )
 | 
						|
            self.process.kill()
 | 
						|
            self.process = None
 | 
						|
            atexit.unregister(self.kill_process)
 | 
						|
 | 
						|
    def wait_process(self):
 | 
						|
        """Wait for the process to terminate if there is a process."""
 | 
						|
        if self.process:
 | 
						|
            self.process.wait()
 | 
						|
            self.process = None
 |