From 257fb6a5390d268bdb363ced5d0bd29d24833593 Mon Sep 17 00:00:00 2001 From: bkfox Date: Thu, 25 May 2023 10:03:11 +0200 Subject: [PATCH] stash --- .../aircox/locale/fr/LC_MESSAGES/django.mo | Bin 379 -> 0 bytes .../aircox/locale/fr/LC_MESSAGES/django.po | 130 ------------ aircox_streamer/connector.py | 27 ++- aircox_streamer/controllers/__init__.py | 9 + aircox_streamer/controllers/base.py | 90 ++++++++ .../sources.py} | 14 +- aircox_streamer/controllers/streamer.py | 192 ++++++++++++++++++ aircox_streamer/tests/conftest.py | 51 +++++ aircox_streamer/tests/test_connector.py | 67 ++++++ .../tests/test_controllers_base.py | 35 ++++ .../tests/test_controllers_sources.py | 51 +++++ .../tests/test_controllers_streamer.py | 50 +++++ 12 files changed, 570 insertions(+), 146 deletions(-) delete mode 100644 aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo delete mode 100644 aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po create mode 100644 aircox_streamer/controllers/__init__.py create mode 100755 aircox_streamer/controllers/base.py rename aircox_streamer/{controllers.py => controllers/sources.py} (97%) create mode 100755 aircox_streamer/controllers/streamer.py create mode 100644 aircox_streamer/tests/conftest.py create mode 100644 aircox_streamer/tests/test_connector.py create mode 100644 aircox_streamer/tests/test_controllers_base.py create mode 100644 aircox_streamer/tests/test_controllers_sources.py create mode 100644 aircox_streamer/tests/test_controllers_streamer.py diff --git a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo b/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.mo deleted file mode 100644 index 2c90dd0c81aca562856271a6885816b565885734..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 379 zcmYL@y-ve06h@0+%E-)c2L=$hP^RMksHSmCgq=ibS_wAOnqZXLk?kUQ5MGaG!2yYW z(vk0#Kg;L)_~@$>au7HPoCXd9mw`6@01213;cPqq$*p;lYmbr*T1o4a(HL?veIRoR zD_Sg)ER71;80!&tmD-@YUFA?|FhqHV3i+e|LVdy_A hALaGViW<#~-8u}q`CZ-UW&nTV=uE>Hdgp_v^8-$QX_^24 diff --git a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po b/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po deleted file mode 100644 index a7628cc..0000000 --- a/aircox_streamer/aircox/locale/fr/LC_MESSAGES/django.po +++ /dev/null @@ -1,130 +0,0 @@ -# SOME DESCRIPTIVE TITLE. -# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER -# This file is distributed under the same license as the PACKAGE package. -# FIRST AUTHOR , YEAR. -# -#, fuzzy -msgid "" -msgstr "" -"Project-Id-Version: PACKAGE VERSION\n" -"Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2020-01-06 14:14+0100\n" -"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" -"Last-Translator: FULL NAME \n" -"Language-Team: LANGUAGE \n" -"Language: \n" -"MIME-Version: 1.0\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Content-Transfer-Encoding: 8bit\n" -"Plural-Forms: nplurals=2; plural=(n > 1);\n" - -#: models.py:37 -msgid "input" -msgstr "" - -#: models.py:38 -msgid "output" -msgstr "" - -#: models.py:56 -msgid "station" -msgstr "" - -#: models.py:58 -msgid "direction" -msgstr "" - -#: models.py:59 -msgid "type" -msgstr "" - -#: models.py:61 -msgid "active" -msgstr "" - -#: models.py:62 -msgid "this port is active" -msgstr "" - -#: models.py:65 -msgid "port settings" -msgstr "" - -#: models.py:66 -msgid "" -"list of comma separated params available; this is put in the output config " -"file as raw code; plugin related" -msgstr "" - -#: templates/aircox_streamer/source_item.html:19 -msgid "Synchronize source with Liquidsoap" -msgstr "" - -#: templates/aircox_streamer/source_item.html:23 -msgid "Synchronise" -msgstr "" - -#: templates/aircox_streamer/source_item.html:26 -msgid "Restart current track" -msgstr "" - -#: templates/aircox_streamer/source_item.html:30 -msgid "Restart" -msgstr "" - -#: templates/aircox_streamer/source_item.html:33 -msgid "Skip current file" -msgstr "" - -#: templates/aircox_streamer/source_item.html:34 -msgid "Skip" -msgstr "" - -#: templates/aircox_streamer/source_item.html:43 -msgid "Add sound" -msgstr "" - -#: templates/aircox_streamer/source_item.html:51 -msgid "Select a sound" -msgstr "" - -#: templates/aircox_streamer/source_item.html:53 -msgid "Add a sound to the queue (queue may start playing)" -msgstr "" - -#: templates/aircox_streamer/source_item.html:62 -msgid "Add" -msgstr "" - -#: templates/aircox_streamer/source_item.html:68 -msgid "Sounds in queue" -msgstr "" - -#: templates/aircox_streamer/source_item.html:86 -msgid "Status" -msgstr "" - -#: templates/aircox_streamer/source_item.html:96 -msgid "Air time" -msgstr "" - -#: templates/aircox_streamer/source_item.html:106 -msgid "Time left" -msgstr "" - -#: templates/aircox_streamer/source_item.html:114 -msgid "Data source" -msgstr "" - -#: templates/aircox_streamer/streamer.html:19 -msgid "Reload" -msgstr "" - -#: templates/aircox_streamer/streamer.html:26 -#: templates/aircox_streamer/streamer.html:27 -msgid "Select a station" -msgstr "" - -#: urls.py:9 views.py:9 -msgid "Streamer Monitor" -msgstr "" diff --git a/aircox_streamer/connector.py b/aircox_streamer/connector.py index 5e4b5ef..18925dd 100755 --- a/aircox_streamer/connector.py +++ b/aircox_streamer/connector.py @@ -12,6 +12,8 @@ class Connector: Received data can be parsed from list of `key=value` or JSON. """ + socket_class = socket.socket + """Socket class to instanciate on open.""" socket = None """The socket.""" address = None @@ -26,27 +28,42 @@ class Connector: if address: self.address = address + def __enter__(self): + r = self.open() + if r == -1: + raise RuntimeError("can not open socket.") + return self + + def __exit__(self): + self.close() + def open(self): + """Open connection. + + :return: 0 (success), 1 (already opened), -1 (failure) + """ if self.is_open: - return + return 1 family = ( socket.AF_UNIX if isinstance(self.address, str) else socket.AF_INET ) try: - self.socket = socket.socket(family, socket.SOCK_STREAM) + self.socket = self.socket_class(family, socket.SOCK_STREAM) self.socket.connect(self.address) + return 0 except Exception: self.close() return -1 def close(self): - self.socket.close() - self.socket = None + if self.is_open: + self.socket.close() + self.socket = None # FIXME: return None on failed def send(self, *data, try_count=1, parse=False, parse_json=False): - if self.open(): + if self.open() == -1: return None data = bytes("".join([str(d) for d in data]) + "\n", encoding="utf-8") diff --git a/aircox_streamer/controllers/__init__.py b/aircox_streamer/controllers/__init__.py new file mode 100644 index 0000000..5ba80f1 --- /dev/null +++ b/aircox_streamer/controllers/__init__.py @@ -0,0 +1,9 @@ +# TODO: for the moment, update in station and program names do not update the +# related fields. + +from .base import Request +from .streamer import Streamer +from .sources import Source, PlaylistSource, QueueSource + + +__all__ = ("Request", "Streamer", "Source", "PlaylistSource", "QueueSource") diff --git a/aircox_streamer/controllers/base.py b/aircox_streamer/controllers/base.py new file mode 100755 index 0000000..9037bbf --- /dev/null +++ b/aircox_streamer/controllers/base.py @@ -0,0 +1,90 @@ +import logging + +import tzlocal +from django.utils import timezone as tz +from django.utils.translation import gettext_lazy as _ + + +__all__ = ( + "BaseMetadata", + "Request", +) + +local_tz = tzlocal.get_localzone() +logger = logging.getLogger("aircox") + + +# FIXME liquidsoap does not manage timezones -- we have to convert +# 'on_air' metadata we get from it into utc one in order to work +# correctly. + + +class BaseMetadata: + """Base class for handling request metadata.""" + + controller = None + """Controller.""" + rid = None + """Request id.""" + uri = None + """Request uri.""" + status = None + """Current playing status.""" + request_status = None + """Requests' status.""" + air_time = None + """Launch datetime.""" + + def __init__(self, controller=None, rid=None, data=None): + self.controller = controller + self.rid = rid + if data is not None: + self.validate(data) + + @property + def is_playing(self): + return self.status == "playing" + + @property + def status_verbose(self): + return self.validate_status(self.status, True) + + def fetch(self): + data = self.controller.send("request.metadata ", self.rid, parse=True) + if data: + self.validate(data) + + def validate_status(self, status, i18n=False): + on_air = self.controller.source + if ( + on_air + and status == "playing" + and (on_air == self or on_air.rid == self.rid) + ): + return _("playing") if i18n else "playing" + elif status == "playing": + return _("paused") if i18n else "paused" + else: + return _("stopped") if i18n else "stopped" + + def validate_air_time(self, air_time): + if air_time: + air_time = tz.datetime.strptime(air_time, "%Y/%m/%d %H:%M:%S") + return local_tz.localize(air_time) + + def validate(self, data): + """Validate provided data and set as attribute (must already be + declared)""" + for key, value in data.items(): + if hasattr(self, key) and not callable(getattr(self, key)): + setattr(self, key, value) + self.uri = data.get("initial_uri") + + self.air_time = self.validate_air_time(data.get("on_air")) + self.status = self.validate_status(data.get("status")) + self.request_status = data.get("status") + + +class Request(BaseMetadata): + title = None + artist = None diff --git a/aircox_streamer/controllers.py b/aircox_streamer/controllers/sources.py similarity index 97% rename from aircox_streamer/controllers.py rename to aircox_streamer/controllers/sources.py index ca1d233..0d928d7 100755 --- a/aircox_streamer/controllers.py +++ b/aircox_streamer/controllers/sources.py @@ -16,21 +16,13 @@ from aircox.utils import to_seconds from .connector import Connector -__all__ = [ - "BaseMetadata", - "Request", - "Streamer", + +__all__ = ( "Source", "PlaylistSource", "QueueSource", -] +) -# TODO: for the moment, update in station and program names do not update the -# related fields. - -# FIXME liquidsoap does not manage timezones -- we have to convert -# 'on_air' metadata we get from it into utc one in order to work -# correctly. local_tz = tzlocal.get_localzone() logger = logging.getLogger("aircox") diff --git a/aircox_streamer/controllers/streamer.py b/aircox_streamer/controllers/streamer.py new file mode 100755 index 0000000..5527176 --- /dev/null +++ b/aircox_streamer/controllers/streamer.py @@ -0,0 +1,192 @@ +import atexit +import logging +import os +import re +import signal +import subprocess + +import psutil +import tzlocal +from django.template.loader import render_to_string + +from aircox.conf import settings + +from ..connector import Connector +from .sources import PlaylistSource, QueueSource + + +__all__ = ("Streamer",) + +local_tz = tzlocal.get_localzone() +logger = logging.getLogger("aircox") + + +class Streamer: + connector = None + process = None + + station = None + template_name = "aircox_streamer/scripts/station.liq" + path = None + """Config path.""" + sources = None + """List of all monitored sources.""" + source = None + """Current source being played on air.""" + # note: we disable on_air rids since we don't have use of it for the + # moment + # on_air = None + # """ On-air request ids (rid) """ + inputs = None + """Queryset to input ports.""" + outputs = None + """Queryset to output ports.""" + + def __init__(self, station, connector=None): + self.station = station + self.inputs = self.station.port_set.active().input() + self.outputs = self.station.port_set.active().output() + + self.id = self.station.slug.replace("-", "_") + self.path = os.path.join(station.path, "station.liq") + self.connector = Connector(os.path.join(station.path, "station.sock")) + self.init_sources() + + @property + def socket_path(self): + """Path to Unix socket file.""" + return self.connector.address + + @property + def is_ready(self): + """If external program is ready to use, returns True.""" + return self.send("list") != "" + + @property + def is_running(self): + """True if holds a running process.""" + if self.process is None: + return False + + returncode = self.process.poll() + if returncode is None: + return True + + self.process = None + logger.debug("process died with return code %s" % returncode) + return False + + @property + def playlists(self): + return (s for s in self.sources if isinstance(s, PlaylistSource)) + + @property + def queues(self): + return (s for s in self.sources if isinstance(s, QueueSource)) + + # Sources and config ############################################### + def send(self, *args, **kwargs): + return self.connector.send(*args, **kwargs) or "" + + def init_sources(self): + streams = self.station.program_set.filter(stream__isnull=False) + self.dealer = QueueSource(self, "dealer") + self.sources = [self.dealer] + [ + PlaylistSource(self, program=program) for program in streams + ] + + def make_config(self): + """Make configuration files and directory (and sync sources)""" + data = render_to_string( + self.template_name, + { + "station": self.station, + "streamer": self, + "settings": settings, + }, + ) + data = re.sub("[\t ]+\n", "\n", data) + data = re.sub("\n{3,}", "\n\n", data) + + os.makedirs(os.path.dirname(self.path), exist_ok=True) + with open(self.path, "w+") as file: + file.write(data) + + self.sync() + + def sync(self): + """Sync all sources.""" + for source in self.sources: + source.sync() + + def fetch(self): + """Fetch data from liquidsoap.""" + for source in self.sources: + source.fetch() + + # request.on_air is not ordered: we need to do it manually + self.source = next( + iter( + sorted( + ( + source + for source in self.sources + if source.request_status == "playing" + and source.air_time + ), + key=lambda o: o.air_time, + reverse=True, + ) + ), + None, + ) + + # Process ########################################################## + def get_process_args(self): + return ["liquidsoap", "-v", self.path] + + def check_zombie_process(self): + if not os.path.exists(self.socket_path): + return + + conns = [ + conn + for conn in psutil.net_connections(kind="unix") + if conn.laddr == self.socket_path + ] + for conn in conns: + if conn.pid is not None: + os.kill(conn.pid, signal.SIGKILL) + + def run_process(self): + """Execute the external application with corresponding informations. + + This function must make sure that all needed files have been + generated. + """ + if self.process: + return + + args = self.get_process_args() + if not args: + return + + self.check_zombie_process() + self.process = subprocess.Popen(args, stderr=subprocess.STDOUT) + atexit.register(lambda: self.kill_process()) + + def kill_process(self): + if self.process: + logger.debug( + "kill process %s: %s", + self.process.pid, + " ".join(self.get_process_args()), + ) + self.process.kill() + self.process = None + + def wait_process(self): + """Wait for the process to terminate if there is a process.""" + if self.process: + self.process.wait() + self.process = None diff --git a/aircox_streamer/tests/conftest.py b/aircox_streamer/tests/conftest.py new file mode 100644 index 0000000..076105a --- /dev/null +++ b/aircox_streamer/tests/conftest.py @@ -0,0 +1,51 @@ +import pytest + +from aircox_streamer import connector + + +class FakeSocket: + FAILING_ADDRESS = -1 + """Connect with this address fails.""" + + family, type, address = None, None, None + sent_data = None + """List of data that have been `send[all]`""" + recv_data = None + """Response data to return on recv.""" + + def __init__(self, family, type): + self.family = family + self.type = type + self.sent_data = [] + + def connect(self, address): + if address == self.FAILING_ADDRESS: + raise RuntimeError("invalid connection") + self.address = address + + def close(self): + pass + + def sendall(self, data): + self.sent_data.append(data) + + def recv(self, count): + data = self.recv_data[:count] + self.recv_data = data[count:] + return data.encode("utf-8") if isinstance(data, str) else data + + +class Connector(connector.Connector): + socket_class = FakeSocket + + +@pytest.fixture +def connector(request): + obj = Connector("test") + yield obj + obj.close() + + +@pytest.fixture +def fail_connector(): + return Connector(FakeSocket.FAILING_ADDRESS) diff --git a/aircox_streamer/tests/test_connector.py b/aircox_streamer/tests/test_connector.py new file mode 100644 index 0000000..e3c601f --- /dev/null +++ b/aircox_streamer/tests/test_connector.py @@ -0,0 +1,67 @@ +import json +import socket + + +from aircox_streamer.connector import Connector + + +class TestConnector: + payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END" + """Test payload.""" + payload_data = {"a": "value_1", "b": "value_b"} + """Resulting data of payload.""" + + def test_open(self, connector): + assert connector.open() == 0 + assert connector.is_open + assert connector.socket.family == socket.AF_UNIX + assert connector.socket.type == socket.SOCK_STREAM + assert connector.socket.address == "test" + connector.close() + + def test_open_af_inet(self): + address = ("test", 30) + connector = Connector(address) + assert connector.open() == 0 + assert connector.is_open + assert connector.socket.family == socket.AF_INET + assert connector.socket.type == socket.SOCK_STREAM + assert connector.socket.address == address + + def test_open_is_already_open(self, connector): + connector.open() + assert connector.open() == 1 + + def test_open_failure(self, fail_connector): + assert fail_connector.open() == -1 + assert fail_connector.socket is None # close() called + + def test_close(self, connector): + connector.open() + assert connector.socket is not None + connector.close() + assert connector.socket is None + + def test_send(self, connector): + connector.socket.recv_data = self.payload + result = connector.send("fake_action", parse=True) + assert result == self.payload_data + + def test_send_open_failure(self, fail_connector): + assert fail_connector.send("fake_action", parse=True) is None + + def test_parse(self, connector): + result = connector.parse(self.payload) + assert result == self.payload_data + + def test_parse_json(self, connector): + # include case where json string is surrounded by '"' + dumps = '"' + json.dumps(self.payload_data) + '"' + result = connector.parse_json(dumps) + assert result == self.payload_data + + def test_parse_json_empty_value(self, connector): + assert connector.parse_json('""') is None + + def test_parse_json_failure(self, connector): + assert connector.parse_json("-- invalid json string --") is None diff --git a/aircox_streamer/tests/test_controllers_base.py b/aircox_streamer/tests/test_controllers_base.py new file mode 100644 index 0000000..157f62c --- /dev/null +++ b/aircox_streamer/tests/test_controllers_base.py @@ -0,0 +1,35 @@ +# import pytest + +# from aircox_streamer import controllers + + +class TestBaseMetaData: + def test_is_playing(self): + pass + + def test_status_verbose(self): + pass + + def test_fetch(self): + pass + + def test_fetch_no_data(self): + pass + + def test_validate_status_playing(self): + pass + + def test_validate_status_paused(self): + pass + + def test_validate_status_stopped(self): + pass + + def test_validate_air_time(self): + pass + + def test_validate_air_time_none(self): + pass + + def test_validate(self): + pass diff --git a/aircox_streamer/tests/test_controllers_sources.py b/aircox_streamer/tests/test_controllers_sources.py new file mode 100644 index 0000000..612c749 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_sources.py @@ -0,0 +1,51 @@ +# import pytest + +# from aircox_streamer import controllers + + +class TestSource: + def test_station(self): + pass + + def test_sync(self): + pass + + def test_fetch(self): + pass + + def test_skip(self): + pass + + def test_restart(self): + pass + + def test_seek(self, n): + pass + + +class TestPlaylistSource: + def test_get_sound_queryset(self): + pass + + def test_get_playlist(self): + pass + + def test_write_playlist(self): + pass + + def test_stream(self): + pass + + def test_sync(self): + pass + + +class TestQueueSource: + def test_push(self): + pass + + def test_fetch(self): + pass + + def test_requests(self): + pass diff --git a/aircox_streamer/tests/test_controllers_streamer.py b/aircox_streamer/tests/test_controllers_streamer.py new file mode 100644 index 0000000..a01d6b3 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_streamer.py @@ -0,0 +1,50 @@ +# import pytest + +# from aircox_streamer import controllers + + +class TestStreamer: + def test_socket_path(self): + pass + + def test_is_ready(self): + pass + + def test_is_running(self): + pass + + def test_playlists(self): + pass + + def test_queues(self): + pass + + def test_send(self): + pass + + def test_init_sources(self): + pass + + def test_make_config(self): + pass + + def test_sync(self): + pass + + def test_fetch(self): + pass + + def test_get_process_args(self): + pass + + def test_check_zombie_process(self): + pass + + def test_run_process(self): + pass + + def test_kill_process(self): + pass + + def test_wait_process(self): + pass