write metadata tests; start source tests
This commit is contained in:
		@ -1,6 +1,14 @@
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
import tzlocal
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aircox_streamer import connector
 | 
			
		||||
from aircox.models import Station, Port
 | 
			
		||||
from aircox_streamer import controllers
 | 
			
		||||
from aircox_streamer.connector import Connector
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
local_tz = tzlocal.get_localzone()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeSocket:
 | 
			
		||||
@ -17,6 +25,7 @@ class FakeSocket:
 | 
			
		||||
        self.family = family
 | 
			
		||||
        self.type = type
 | 
			
		||||
        self.sent_data = []
 | 
			
		||||
        self.recv_data = ""
 | 
			
		||||
 | 
			
		||||
    def connect(self, address):
 | 
			
		||||
        if address == self.FAILING_ADDRESS:
 | 
			
		||||
@ -30,22 +39,99 @@ class FakeSocket:
 | 
			
		||||
        self.sent_data.append(data)
 | 
			
		||||
 | 
			
		||||
    def recv(self, count):
 | 
			
		||||
        data = self.recv_data[:count]
 | 
			
		||||
        self.recv_data = data[count:]
 | 
			
		||||
        if isinstance(self.recv_data, list):
 | 
			
		||||
            if len(self.recv_data):
 | 
			
		||||
                data, self.recv_data = self.recv_data[0], self.recv_data[1:]
 | 
			
		||||
            else:
 | 
			
		||||
                data = ""
 | 
			
		||||
        else:
 | 
			
		||||
            data = self.recv_data
 | 
			
		||||
            self.recv_data = self.recv_data[count:]
 | 
			
		||||
        data = data[:count]
 | 
			
		||||
        return data.encode("utf-8") if isinstance(data, str) else data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Connector(connector.Connector):
 | 
			
		||||
    socket_class = FakeSocket
 | 
			
		||||
# -- models
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def station():
 | 
			
		||||
    station = Station(name="test", path="/tmp", default=True, active=True)
 | 
			
		||||
    station.save()
 | 
			
		||||
    return station
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def connector(request):
 | 
			
		||||
    obj = Connector("test")
 | 
			
		||||
def station_ports(station):
 | 
			
		||||
    ports = [
 | 
			
		||||
        Port(
 | 
			
		||||
            station=station,
 | 
			
		||||
            direction=Port.DIRECTION_INPUT,
 | 
			
		||||
            type=Port.TYPE_HTTP,
 | 
			
		||||
            active=True,
 | 
			
		||||
        ),
 | 
			
		||||
        Port(
 | 
			
		||||
            station=station,
 | 
			
		||||
            direction=Port.DIRECTION_OUTPUT,
 | 
			
		||||
            type=Port.TYPE_FILE,
 | 
			
		||||
            active=True,
 | 
			
		||||
        ),
 | 
			
		||||
    ]
 | 
			
		||||
    for port in ports:
 | 
			
		||||
        port.save()
 | 
			
		||||
    return ports
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# -- connectors
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def connector():
 | 
			
		||||
    obj = Connector("/tmp/test.sock")
 | 
			
		||||
    obj.socket_class = FakeSocket
 | 
			
		||||
    yield obj
 | 
			
		||||
    obj.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def fail_connector():
 | 
			
		||||
    return Connector(FakeSocket.FAILING_ADDRESS)
 | 
			
		||||
    obj = Connector(FakeSocket.FAILING_ADDRESS)
 | 
			
		||||
    obj.socket_class = FakeSocket
 | 
			
		||||
    yield obj
 | 
			
		||||
    obj.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def controller(station, connector):
 | 
			
		||||
    connector.open()
 | 
			
		||||
    return controllers.Streamer(station, connector)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def socket(controller):
 | 
			
		||||
    return controller.connector.socket
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# -- metadata
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def metadata(controller):
 | 
			
		||||
    return controllers.Metadata(controller, 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def metadata_data_air_time():
 | 
			
		||||
    return local_tz.localize(datetime(2023, 5, 1, 12, 10, 5))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def metadata_data(metadata_data_air_time):
 | 
			
		||||
    return {
 | 
			
		||||
        "rid": 1,
 | 
			
		||||
        "initial_uri": "request_uri",
 | 
			
		||||
        "on_air": metadata_data_air_time.strftime("%Y/%m/%d %H:%M:%S"),
 | 
			
		||||
        "status": "playing",
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def metadata_string(metadata_data):
 | 
			
		||||
    return (
 | 
			
		||||
        "\n".join(f"{key}={value}" for key, value in metadata_data.items())
 | 
			
		||||
        + "\nEND"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
@ -2,9 +2,6 @@ import json
 | 
			
		||||
import socket
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from aircox_streamer.connector import Connector
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestConnector:
 | 
			
		||||
    payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END"
 | 
			
		||||
    """Test payload."""
 | 
			
		||||
@ -16,12 +13,12 @@ class TestConnector:
 | 
			
		||||
        assert connector.is_open
 | 
			
		||||
        assert connector.socket.family == socket.AF_UNIX
 | 
			
		||||
        assert connector.socket.type == socket.SOCK_STREAM
 | 
			
		||||
        assert connector.socket.address == "test"
 | 
			
		||||
        assert connector.socket.address == "/tmp/test.sock"
 | 
			
		||||
        connector.close()
 | 
			
		||||
 | 
			
		||||
    def test_open_af_inet(self):
 | 
			
		||||
    def test_open_af_inet(self, connector):
 | 
			
		||||
        address = ("test", 30)
 | 
			
		||||
        connector = Connector(address)
 | 
			
		||||
        connector.address = address
 | 
			
		||||
        assert connector.open() == 0
 | 
			
		||||
        assert connector.is_open
 | 
			
		||||
        assert connector.socket.family == socket.AF_INET
 | 
			
		||||
@ -43,6 +40,7 @@ class TestConnector:
 | 
			
		||||
        assert connector.socket is None
 | 
			
		||||
 | 
			
		||||
    def test_send(self, connector):
 | 
			
		||||
        connector.open()
 | 
			
		||||
        connector.socket.recv_data = self.payload
 | 
			
		||||
        result = connector.send("fake_action", parse=True)
 | 
			
		||||
        assert result == self.payload_data
 | 
			
		||||
 | 
			
		||||
@ -1,35 +0,0 @@
 | 
			
		||||
# import pytest
 | 
			
		||||
 | 
			
		||||
# from aircox_streamer import controllers
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestBaseMetaData:
 | 
			
		||||
    def test_is_playing(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_status_verbose(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_fetch(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_fetch_no_data(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate_status_playing(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate_status_paused(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate_status_stopped(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate_air_time(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate_air_time_none(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def test_validate(self):
 | 
			
		||||
        pass
 | 
			
		||||
							
								
								
									
										59
									
								
								aircox_streamer/tests/test_controllers_metadata.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								aircox_streamer/tests/test_controllers_metadata.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,59 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aircox_streamer.controllers import Metadata
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestBaseMetaData:
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_is_playing(self, metadata):
 | 
			
		||||
        metadata.status = "playing"
 | 
			
		||||
        assert metadata.is_playing
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_is_playing_false(self, metadata):
 | 
			
		||||
        metadata.status = "other"
 | 
			
		||||
        assert not metadata.is_playing
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_fetch(self, controller, metadata, metadata_data, metadata_string):
 | 
			
		||||
        controller.connector.socket.recv_data = metadata_string
 | 
			
		||||
        metadata.fetch()
 | 
			
		||||
        assert metadata.uri == metadata_data["initial_uri"]
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate_status_playing(self, controller, metadata):
 | 
			
		||||
        controller.source = metadata
 | 
			
		||||
        assert metadata.validate_status("playing") == "playing"
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate_status_paused(self, controller, metadata):
 | 
			
		||||
        controller.source = Metadata(controller, metadata.rid + 1)
 | 
			
		||||
        assert metadata.validate_status("playing") == "paused"
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate_status_stopped(self, controller, metadata):
 | 
			
		||||
        controller.source = Metadata(controller, 2)
 | 
			
		||||
        assert metadata.validate_status("") == "stopped"
 | 
			
		||||
        assert metadata.validate_status("any") == "stopped"
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate_air_time(
 | 
			
		||||
        self, metadata, metadata_data, metadata_data_air_time
 | 
			
		||||
    ):
 | 
			
		||||
        air_time = metadata_data["on_air"]
 | 
			
		||||
        result = metadata.validate_air_time(air_time)
 | 
			
		||||
        assert result == metadata_data_air_time
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate_air_time_none(self, metadata):
 | 
			
		||||
        assert metadata.validate_air_time("") is None
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_validate(self, metadata, metadata_data, metadata_data_air_time):
 | 
			
		||||
        metadata.validate(metadata_data)
 | 
			
		||||
        assert metadata.uri == metadata_data["initial_uri"]
 | 
			
		||||
        assert metadata.air_time == metadata_data_air_time
 | 
			
		||||
        # controller.source != metadata + status = "playing"
 | 
			
		||||
        #       => status == "paused"
 | 
			
		||||
        assert metadata.status == "paused"
 | 
			
		||||
        assert metadata.request_status == "playing"
 | 
			
		||||
@ -1,51 +1,81 @@
 | 
			
		||||
# import pytest
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
# from aircox_streamer import controllers
 | 
			
		||||
from aircox_streamer.controllers import Source
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def source(controller):
 | 
			
		||||
    return Source(controller, 13)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestSource:
 | 
			
		||||
    def test_station(self):
 | 
			
		||||
        pass
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_station(self, source, station):
 | 
			
		||||
        assert source.station == station
 | 
			
		||||
 | 
			
		||||
    def test_sync(self):
 | 
			
		||||
        pass
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_fetch(self, socket, source, metadata_string):
 | 
			
		||||
        remaining = 3.12
 | 
			
		||||
        socket.recv_data = [
 | 
			
		||||
            f"{remaining} END",
 | 
			
		||||
            metadata_string,
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    def test_fetch(self):
 | 
			
		||||
        pass
 | 
			
		||||
        source.fetch()
 | 
			
		||||
        assert f"{source.id}.remaining" in socket.sent_data
 | 
			
		||||
        assert f"{source.id}.get" in socket.sent_data
 | 
			
		||||
 | 
			
		||||
    def test_skip(self):
 | 
			
		||||
        pass
 | 
			
		||||
        assert source.remaining == remaining
 | 
			
		||||
        assert source["request_uri"]
 | 
			
		||||
 | 
			
		||||
    def test_restart(self):
 | 
			
		||||
        pass
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_skip(self, socket, source):
 | 
			
		||||
        source.skip()
 | 
			
		||||
        assert f"{source.id}.skip" in socket.sent_data
 | 
			
		||||
 | 
			
		||||
    def test_seek(self, n):
 | 
			
		||||
        pass
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_restart(self, socket, source):
 | 
			
		||||
        source.skip()
 | 
			
		||||
        prefix = f"{source.id}.seek"
 | 
			
		||||
        assert any(r for r in socket.sent_data if r.startswith(prefix))
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_seek(self, socket, source):
 | 
			
		||||
        source.seek(10)
 | 
			
		||||
        assert f"{source.id}.skip 10" in socket.sent_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestPlaylistSource:
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_get_sound_queryset(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_get_playlist(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_write_playlist(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_stream(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_sync(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestQueueSource:
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_push(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_fetch(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.django_db
 | 
			
		||||
    def test_requests(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user