diff --git a/aircox_streamer/connector.py b/aircox_streamer/connector.py index 18925dd..e666e75 100755 --- a/aircox_streamer/connector.py +++ b/aircox_streamer/connector.py @@ -49,10 +49,14 @@ class Connector: socket.AF_UNIX if isinstance(self.address, str) else socket.AF_INET ) try: + print("-----", self.address) self.socket = self.socket_class(family, socket.SOCK_STREAM) self.socket.connect(self.address) return 0 except Exception: + import traceback + + traceback.print_exc() self.close() return -1 diff --git a/aircox_streamer/controllers/__init__.py b/aircox_streamer/controllers/__init__.py index 5ba80f1..6fd5578 100644 --- a/aircox_streamer/controllers/__init__.py +++ b/aircox_streamer/controllers/__init__.py @@ -1,9 +1,16 @@ # TODO: for the moment, update in station and program names do not update the # related fields. -from .base import Request +from .metadata import Metadata, Request from .streamer import Streamer from .sources import Source, PlaylistSource, QueueSource -__all__ = ("Request", "Streamer", "Source", "PlaylistSource", "QueueSource") +__all__ = ( + "Metadata", + "Request", + "Streamer", + "Source", + "PlaylistSource", + "QueueSource", +) diff --git a/aircox_streamer/controllers/base.py b/aircox_streamer/controllers/metadata.py similarity index 73% rename from aircox_streamer/controllers/base.py rename to aircox_streamer/controllers/metadata.py index 9037bbf..6c3ddab 100755 --- a/aircox_streamer/controllers/base.py +++ b/aircox_streamer/controllers/metadata.py @@ -6,7 +6,7 @@ from django.utils.translation import gettext_lazy as _ __all__ = ( - "BaseMetadata", + "Metadata", "Request", ) @@ -19,7 +19,7 @@ logger = logging.getLogger("aircox") # correctly. -class BaseMetadata: +class Metadata: """Base class for handling request metadata.""" controller = None @@ -43,29 +43,34 @@ class BaseMetadata: @property def is_playing(self): + """True if the source is playing.""" + # FIXME: validate on controller's current source? return self.status == "playing" @property def status_verbose(self): - return self.validate_status(self.status, True) + """Verbose version of self's status (translated string).""" + status = self.validate_status(self.status) + return _(status) if status else "" def fetch(self): data = self.controller.send("request.metadata ", self.rid, parse=True) if data: self.validate(data) - def validate_status(self, status, i18n=False): + def validate_status(self, status): + """Return correct status for this metadata based on provided one and + controller. + + :returns: status string + """ on_air = self.controller.source - if ( - on_air - and status == "playing" - and (on_air == self or on_air.rid == self.rid) - ): - return _("playing") if i18n else "playing" - elif status == "playing": - return _("paused") if i18n else "paused" + if "playing" and on_air and (on_air == self or on_air.rid == self.rid): + return "playing" + elif status in ("paused", "playing"): + return "paused" else: - return _("stopped") if i18n else "stopped" + return "stopped" def validate_air_time(self, air_time): if air_time: @@ -85,6 +90,6 @@ class BaseMetadata: self.request_status = data.get("status") -class Request(BaseMetadata): +class Request(Metadata): title = None artist = None diff --git a/aircox_streamer/controllers/sources.py b/aircox_streamer/controllers/sources.py index 0d928d7..70b9e3b 100755 --- a/aircox_streamer/controllers/sources.py +++ b/aircox_streamer/controllers/sources.py @@ -1,20 +1,9 @@ -import atexit -import logging import os -import re -import signal -import subprocess - -import psutil import tzlocal -from django.template.loader import render_to_string -from django.utils import timezone as tz -from django.utils.translation import gettext_lazy as _ -from aircox.conf import settings from aircox.utils import to_seconds -from .connector import Connector +from .metadata import Metadata, Request __all__ = ( @@ -23,254 +12,10 @@ __all__ = ( "QueueSource", ) - local_tz = tzlocal.get_localzone() -logger = logging.getLogger("aircox") -class BaseMetadata: - """Base class for handling request metadata.""" - - controller = None - """Controller.""" - rid = None - """Request id.""" - uri = None - """Request uri.""" - status = None - """Current playing status.""" - request_status = None - """Requests' status.""" - air_time = None - """Launch datetime.""" - - def __init__(self, controller=None, rid=None, data=None): - self.controller = controller - self.rid = rid - if data is not None: - self.validate(data) - - @property - def is_playing(self): - return self.status == "playing" - - @property - def status_verbose(self): - return self.validate_status(self.status, True) - - def fetch(self): - data = self.controller.send("request.metadata ", self.rid, parse=True) - if data: - self.validate(data) - - def validate_status(self, status, i18n=False): - on_air = self.controller.source - if ( - on_air - and status == "playing" - and (on_air == self or on_air.rid == self.rid) - ): - return _("playing") if i18n else "playing" - elif status == "playing": - return _("paused") if i18n else "paused" - else: - return _("stopped") if i18n else "stopped" - - def validate_air_time(self, air_time): - if air_time: - air_time = tz.datetime.strptime(air_time, "%Y/%m/%d %H:%M:%S") - return local_tz.localize(air_time) - - def validate(self, data): - """Validate provided data and set as attribute (must already be - declared)""" - for key, value in data.items(): - if hasattr(self, key) and not callable(getattr(self, key)): - setattr(self, key, value) - self.uri = data.get("initial_uri") - - self.air_time = self.validate_air_time(data.get("on_air")) - self.status = self.validate_status(data.get("status")) - self.request_status = data.get("status") - - -class Request(BaseMetadata): - title = None - artist = None - - -class Streamer: - connector = None - process = None - - station = None - template_name = "aircox_streamer/scripts/station.liq" - path = None - """Config path.""" - sources = None - """List of all monitored sources.""" - source = None - """Current source being played on air.""" - # note: we disable on_air rids since we don't have use of it for the - # moment - # on_air = None - # """ On-air request ids (rid) """ - inputs = None - """Queryset to input ports.""" - outputs = None - """Queryset to output ports.""" - - def __init__(self, station, connector=None): - self.station = station - self.inputs = self.station.port_set.active().input() - self.outputs = self.station.port_set.active().output() - - self.id = self.station.slug.replace("-", "_") - self.path = os.path.join(station.path, "station.liq") - self.connector = Connector(os.path.join(station.path, "station.sock")) - self.init_sources() - - @property - def socket_path(self): - """Path to Unix socket file.""" - return self.connector.address - - @property - def is_ready(self): - """If external program is ready to use, returns True.""" - return self.send("list") != "" - - @property - def is_running(self): - """True if holds a running process.""" - if self.process is None: - return False - - returncode = self.process.poll() - if returncode is None: - return True - - self.process = None - logger.debug("process died with return code %s" % returncode) - return False - - @property - def playlists(self): - return (s for s in self.sources if isinstance(s, PlaylistSource)) - - @property - def queues(self): - return (s for s in self.sources if isinstance(s, QueueSource)) - - # Sources and config ############################################### - def send(self, *args, **kwargs): - return self.connector.send(*args, **kwargs) or "" - - def init_sources(self): - streams = self.station.program_set.filter(stream__isnull=False) - self.dealer = QueueSource(self, "dealer") - self.sources = [self.dealer] + [ - PlaylistSource(self, program=program) for program in streams - ] - - def make_config(self): - """Make configuration files and directory (and sync sources)""" - data = render_to_string( - self.template_name, - { - "station": self.station, - "streamer": self, - "settings": settings, - }, - ) - data = re.sub("[\t ]+\n", "\n", data) - data = re.sub("\n{3,}", "\n\n", data) - - os.makedirs(os.path.dirname(self.path), exist_ok=True) - with open(self.path, "w+") as file: - file.write(data) - - self.sync() - - def sync(self): - """Sync all sources.""" - for source in self.sources: - source.sync() - - def fetch(self): - """Fetch data from liquidsoap.""" - for source in self.sources: - source.fetch() - - # request.on_air is not ordered: we need to do it manually - self.source = next( - iter( - sorted( - ( - source - for source in self.sources - if source.request_status == "playing" - and source.air_time - ), - key=lambda o: o.air_time, - reverse=True, - ) - ), - None, - ) - - # Process ########################################################## - def get_process_args(self): - return ["liquidsoap", "-v", self.path] - - def check_zombie_process(self): - if not os.path.exists(self.socket_path): - return - - conns = [ - conn - for conn in psutil.net_connections(kind="unix") - if conn.laddr == self.socket_path - ] - for conn in conns: - if conn.pid is not None: - os.kill(conn.pid, signal.SIGKILL) - - def run_process(self): - """Execute the external application with corresponding informations. - - This function must make sure that all needed files have been - generated. - """ - if self.process: - return - - args = self.get_process_args() - if not args: - return - - self.check_zombie_process() - self.process = subprocess.Popen(args, stderr=subprocess.STDOUT) - atexit.register(lambda: self.kill_process()) - - def kill_process(self): - if self.process: - logger.debug( - "kill process %s: %s", - self.process.pid, - " ".join(self.get_process_args()), - ) - self.process.kill() - self.process = None - - def wait_process(self): - """Wait for the process to terminate if there is a process.""" - if self.process: - self.process.wait() - self.process = None - - -class Source(BaseMetadata): +class Source(Metadata): controller = None """Parent controller.""" id = None diff --git a/aircox_streamer/controllers/streamer.py b/aircox_streamer/controllers/streamer.py index 5527176..5404a97 100755 --- a/aircox_streamer/controllers/streamer.py +++ b/aircox_streamer/controllers/streamer.py @@ -6,7 +6,6 @@ import signal import subprocess import psutil -import tzlocal from django.template.loader import render_to_string from aircox.conf import settings @@ -17,7 +16,6 @@ from .sources import PlaylistSource, QueueSource __all__ = ("Streamer",) -local_tz = tzlocal.get_localzone() logger = logging.getLogger("aircox") @@ -49,7 +47,9 @@ class Streamer: self.id = self.station.slug.replace("-", "_") self.path = os.path.join(station.path, "station.liq") - self.connector = Connector(os.path.join(station.path, "station.sock")) + self.connector = connector or Connector( + os.path.join(station.path, "station.sock") + ) self.init_sources() @property diff --git a/aircox_streamer/serializers.py b/aircox_streamer/serializers.py index ce0ae79..ac3a2a3 100644 --- a/aircox_streamer/serializers.py +++ b/aircox_streamer/serializers.py @@ -22,18 +22,18 @@ class BaseSerializer(serializers.Serializer): return reverse(self.url_name, kwargs=kwargs) -class BaseMetadataSerializer(BaseSerializer): +class MetadataSerializer(BaseSerializer): rid = serializers.IntegerField() air_time = serializers.DateTimeField() uri = serializers.CharField() -class RequestSerializer(BaseMetadataSerializer): +class RequestSerializer(MetadataSerializer): title = serializers.CharField(required=False) artist = serializers.CharField(required=False) -class SourceSerializer(BaseMetadataSerializer): +class SourceSerializer(MetadataSerializer): id = serializers.CharField() uri = serializers.CharField() rid = serializers.IntegerField() diff --git a/aircox_streamer/tests/conftest.py b/aircox_streamer/tests/conftest.py index 076105a..ffbb5e9 100644 --- a/aircox_streamer/tests/conftest.py +++ b/aircox_streamer/tests/conftest.py @@ -1,6 +1,14 @@ +from datetime import datetime +import tzlocal + import pytest -from aircox_streamer import connector +from aircox.models import Station, Port +from aircox_streamer import controllers +from aircox_streamer.connector import Connector + + +local_tz = tzlocal.get_localzone() class FakeSocket: @@ -17,6 +25,7 @@ class FakeSocket: self.family = family self.type = type self.sent_data = [] + self.recv_data = "" def connect(self, address): if address == self.FAILING_ADDRESS: @@ -30,22 +39,99 @@ class FakeSocket: self.sent_data.append(data) def recv(self, count): - data = self.recv_data[:count] - self.recv_data = data[count:] + if isinstance(self.recv_data, list): + if len(self.recv_data): + data, self.recv_data = self.recv_data[0], self.recv_data[1:] + else: + data = "" + else: + data = self.recv_data + self.recv_data = self.recv_data[count:] + data = data[:count] return data.encode("utf-8") if isinstance(data, str) else data -class Connector(connector.Connector): - socket_class = FakeSocket +# -- models +@pytest.fixture +def station(): + station = Station(name="test", path="/tmp", default=True, active=True) + station.save() + return station @pytest.fixture -def connector(request): - obj = Connector("test") +def station_ports(station): + ports = [ + Port( + station=station, + direction=Port.DIRECTION_INPUT, + type=Port.TYPE_HTTP, + active=True, + ), + Port( + station=station, + direction=Port.DIRECTION_OUTPUT, + type=Port.TYPE_FILE, + active=True, + ), + ] + for port in ports: + port.save() + return ports + + +# -- connectors +@pytest.fixture +def connector(): + obj = Connector("/tmp/test.sock") + obj.socket_class = FakeSocket yield obj obj.close() @pytest.fixture def fail_connector(): - return Connector(FakeSocket.FAILING_ADDRESS) + obj = Connector(FakeSocket.FAILING_ADDRESS) + obj.socket_class = FakeSocket + yield obj + obj.close() + + +@pytest.fixture +def controller(station, connector): + connector.open() + return controllers.Streamer(station, connector) + + +@pytest.fixture +def socket(controller): + return controller.connector.socket + + +# -- metadata +@pytest.fixture +def metadata(controller): + return controllers.Metadata(controller, 1) + + +@pytest.fixture +def metadata_data_air_time(): + return local_tz.localize(datetime(2023, 5, 1, 12, 10, 5)) + + +@pytest.fixture +def metadata_data(metadata_data_air_time): + return { + "rid": 1, + "initial_uri": "request_uri", + "on_air": metadata_data_air_time.strftime("%Y/%m/%d %H:%M:%S"), + "status": "playing", + } + + +@pytest.fixture +def metadata_string(metadata_data): + return ( + "\n".join(f"{key}={value}" for key, value in metadata_data.items()) + + "\nEND" + ) diff --git a/aircox_streamer/tests/test_connector.py b/aircox_streamer/tests/test_connector.py index e3c601f..700af55 100644 --- a/aircox_streamer/tests/test_connector.py +++ b/aircox_streamer/tests/test_connector.py @@ -2,9 +2,6 @@ import json import socket -from aircox_streamer.connector import Connector - - class TestConnector: payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END" """Test payload.""" @@ -16,12 +13,12 @@ class TestConnector: assert connector.is_open assert connector.socket.family == socket.AF_UNIX assert connector.socket.type == socket.SOCK_STREAM - assert connector.socket.address == "test" + assert connector.socket.address == "/tmp/test.sock" connector.close() - def test_open_af_inet(self): + def test_open_af_inet(self, connector): address = ("test", 30) - connector = Connector(address) + connector.address = address assert connector.open() == 0 assert connector.is_open assert connector.socket.family == socket.AF_INET @@ -43,6 +40,7 @@ class TestConnector: assert connector.socket is None def test_send(self, connector): + connector.open() connector.socket.recv_data = self.payload result = connector.send("fake_action", parse=True) assert result == self.payload_data diff --git a/aircox_streamer/tests/test_controllers_base.py b/aircox_streamer/tests/test_controllers_base.py deleted file mode 100644 index 157f62c..0000000 --- a/aircox_streamer/tests/test_controllers_base.py +++ /dev/null @@ -1,35 +0,0 @@ -# import pytest - -# from aircox_streamer import controllers - - -class TestBaseMetaData: - def test_is_playing(self): - pass - - def test_status_verbose(self): - pass - - def test_fetch(self): - pass - - def test_fetch_no_data(self): - pass - - def test_validate_status_playing(self): - pass - - def test_validate_status_paused(self): - pass - - def test_validate_status_stopped(self): - pass - - def test_validate_air_time(self): - pass - - def test_validate_air_time_none(self): - pass - - def test_validate(self): - pass diff --git a/aircox_streamer/tests/test_controllers_metadata.py b/aircox_streamer/tests/test_controllers_metadata.py new file mode 100644 index 0000000..3e940a3 --- /dev/null +++ b/aircox_streamer/tests/test_controllers_metadata.py @@ -0,0 +1,59 @@ +import pytest + +from aircox_streamer.controllers import Metadata + + +class TestBaseMetaData: + @pytest.mark.django_db + def test_is_playing(self, metadata): + metadata.status = "playing" + assert metadata.is_playing + + @pytest.mark.django_db + def test_is_playing_false(self, metadata): + metadata.status = "other" + assert not metadata.is_playing + + @pytest.mark.django_db + def test_fetch(self, controller, metadata, metadata_data, metadata_string): + controller.connector.socket.recv_data = metadata_string + metadata.fetch() + assert metadata.uri == metadata_data["initial_uri"] + + @pytest.mark.django_db + def test_validate_status_playing(self, controller, metadata): + controller.source = metadata + assert metadata.validate_status("playing") == "playing" + + @pytest.mark.django_db + def test_validate_status_paused(self, controller, metadata): + controller.source = Metadata(controller, metadata.rid + 1) + assert metadata.validate_status("playing") == "paused" + + @pytest.mark.django_db + def test_validate_status_stopped(self, controller, metadata): + controller.source = Metadata(controller, 2) + assert metadata.validate_status("") == "stopped" + assert metadata.validate_status("any") == "stopped" + + @pytest.mark.django_db + def test_validate_air_time( + self, metadata, metadata_data, metadata_data_air_time + ): + air_time = metadata_data["on_air"] + result = metadata.validate_air_time(air_time) + assert result == metadata_data_air_time + + @pytest.mark.django_db + def test_validate_air_time_none(self, metadata): + assert metadata.validate_air_time("") is None + + @pytest.mark.django_db + def test_validate(self, metadata, metadata_data, metadata_data_air_time): + metadata.validate(metadata_data) + assert metadata.uri == metadata_data["initial_uri"] + assert metadata.air_time == metadata_data_air_time + # controller.source != metadata + status = "playing" + # => status == "paused" + assert metadata.status == "paused" + assert metadata.request_status == "playing" diff --git a/aircox_streamer/tests/test_controllers_sources.py b/aircox_streamer/tests/test_controllers_sources.py index 612c749..a9d9629 100644 --- a/aircox_streamer/tests/test_controllers_sources.py +++ b/aircox_streamer/tests/test_controllers_sources.py @@ -1,51 +1,81 @@ -# import pytest +import pytest -# from aircox_streamer import controllers +from aircox_streamer.controllers import Source + + +@pytest.fixture +def source(controller): + return Source(controller, 13) class TestSource: - def test_station(self): - pass + @pytest.mark.django_db + def test_station(self, source, station): + assert source.station == station - def test_sync(self): - pass + @pytest.mark.django_db + def test_fetch(self, socket, source, metadata_string): + remaining = 3.12 + socket.recv_data = [ + f"{remaining} END", + metadata_string, + ] - def test_fetch(self): - pass + source.fetch() + assert f"{source.id}.remaining" in socket.sent_data + assert f"{source.id}.get" in socket.sent_data - def test_skip(self): - pass + assert source.remaining == remaining + assert source["request_uri"] - def test_restart(self): - pass + @pytest.mark.django_db + def test_skip(self, socket, source): + source.skip() + assert f"{source.id}.skip" in socket.sent_data - def test_seek(self, n): - pass + @pytest.mark.django_db + def test_restart(self, socket, source): + source.skip() + prefix = f"{source.id}.seek" + assert any(r for r in socket.sent_data if r.startswith(prefix)) + + @pytest.mark.django_db + def test_seek(self, socket, source): + source.seek(10) + assert f"{source.id}.skip 10" in socket.sent_data class TestPlaylistSource: + @pytest.mark.django_db def test_get_sound_queryset(self): pass + @pytest.mark.django_db def test_get_playlist(self): pass + @pytest.mark.django_db def test_write_playlist(self): pass + @pytest.mark.django_db def test_stream(self): pass + @pytest.mark.django_db def test_sync(self): pass class TestQueueSource: + @pytest.mark.django_db def test_push(self): pass + @pytest.mark.django_db def test_fetch(self): pass + @pytest.mark.django_db def test_requests(self): pass