
- Add configuration files for packaging - Precommit now uses ruff Co-authored-by: bkfox <thomas bkfox net> Reviewed-on: #127
294 lines
6.9 KiB
Python
294 lines
6.9 KiB
Python
import itertools
|
|
import os
|
|
|
|
from datetime import datetime, time
|
|
import tzlocal
|
|
|
|
import pytest
|
|
from model_bakery import baker
|
|
|
|
from aircox import models
|
|
from aircox_streamer import controllers
|
|
from aircox_streamer.connector import Connector
|
|
|
|
|
|
local_tz = tzlocal.get_localzone()
|
|
|
|
|
|
working_dir = os.path.join(os.path.dirname(__file__), "working_dir")
|
|
|
|
|
|
class FakeSocket:
|
|
FAILING_ADDRESS = -1
|
|
"""Connect with this address fails."""
|
|
|
|
family, type, address = None, None, None
|
|
sent_data = None
|
|
"""List of data that have been `send[all]`"""
|
|
recv_data = None
|
|
"""Response data to return on recv."""
|
|
|
|
def __init__(self, family, type):
|
|
self.family = family
|
|
self.type = type
|
|
self.sent_data = []
|
|
self.recv_data = ""
|
|
|
|
def connect(self, address):
|
|
if address == self.FAILING_ADDRESS:
|
|
raise RuntimeError("invalid connection")
|
|
self.address = address
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def sendall(self, data):
|
|
self.sent_data.append(data.decode())
|
|
|
|
def recv(self, count):
|
|
if isinstance(self.recv_data, list):
|
|
if len(self.recv_data):
|
|
data, self.recv_data = self.recv_data[0], self.recv_data[1:]
|
|
else:
|
|
data = ""
|
|
else:
|
|
data = self.recv_data
|
|
self.recv_data = self.recv_data[count:]
|
|
data = data[:count]
|
|
return (data.encode("utf-8") if isinstance(data, str) else data) or b"\nEND"
|
|
|
|
def is_sent(self, data):
|
|
"""Return True if provided data have been sent."""
|
|
# use [:-1] because connector add "\n" at sent data
|
|
return any(r for r in self.sent_data if r == data or r[:-1] == data)
|
|
|
|
|
|
# -- models
|
|
@pytest.fixture
|
|
def station():
|
|
station = models.Station(name="test", path=working_dir, default=True, active=True)
|
|
station.save()
|
|
return station
|
|
|
|
|
|
@pytest.fixture
|
|
def stations(station):
|
|
objs = [
|
|
models.Station(
|
|
name=f"test-{i}",
|
|
slug=f"test-{i}",
|
|
path=working_dir,
|
|
default=(i == 0),
|
|
active=True,
|
|
)
|
|
for i in range(0, 3)
|
|
]
|
|
models.Station.objects.bulk_create(objs)
|
|
return [station] + objs
|
|
|
|
|
|
@pytest.fixture
|
|
def station_ports(station):
|
|
return _stations_ports(station)
|
|
|
|
|
|
@pytest.fixture
|
|
def stations_ports(stations):
|
|
return _stations_ports(*stations)
|
|
|
|
|
|
def _stations_ports(*stations):
|
|
items = list(
|
|
itertools.chain(
|
|
*[
|
|
(
|
|
models.Port(
|
|
station=station,
|
|
direction=models.Port.DIRECTION_INPUT,
|
|
type=models.Port.TYPE_HTTP,
|
|
active=True,
|
|
),
|
|
models.Port(
|
|
station=station,
|
|
direction=models.Port.DIRECTION_OUTPUT,
|
|
type=models.Port.TYPE_FILE,
|
|
active=True,
|
|
),
|
|
)
|
|
for station in stations
|
|
]
|
|
)
|
|
)
|
|
models.Port.objects.bulk_create(items)
|
|
return items
|
|
|
|
|
|
@pytest.fixture
|
|
def program(station):
|
|
program = models.Program(title="test", station=station)
|
|
program.save()
|
|
return program
|
|
|
|
|
|
@pytest.fixture
|
|
def stream(program):
|
|
stream = models.Stream(program=program, begin=time(10, 12), end=time(12, 13))
|
|
stream.save()
|
|
return stream
|
|
|
|
|
|
@pytest.fixture
|
|
def episode(program):
|
|
return baker.make(models.Episode, title="test episode", program=program)
|
|
|
|
|
|
@pytest.fixture
|
|
def sound(program, episode):
|
|
sound = models.Sound(
|
|
program=program,
|
|
episode=episode,
|
|
name="sound",
|
|
type=models.Sound.TYPE_ARCHIVE,
|
|
position=0,
|
|
file="sound.mp3",
|
|
)
|
|
sound.save(check=False)
|
|
return sound
|
|
|
|
|
|
@pytest.fixture
|
|
def sounds(program):
|
|
items = [
|
|
models.Sound(
|
|
name=f"sound {i}",
|
|
program=program,
|
|
type=models.Sound.TYPE_ARCHIVE,
|
|
position=i,
|
|
file=f"sound-{i}.mp3",
|
|
)
|
|
for i in range(0, 3)
|
|
]
|
|
models.Sound.objects.bulk_create(items)
|
|
return items
|
|
|
|
|
|
# -- connectors
|
|
@pytest.fixture
|
|
def connector():
|
|
obj = Connector(os.path.join(working_dir, "test.sock"))
|
|
obj.socket_class = FakeSocket
|
|
yield obj
|
|
obj.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def fail_connector():
|
|
obj = Connector(FakeSocket.FAILING_ADDRESS)
|
|
obj.socket_class = FakeSocket
|
|
yield obj
|
|
obj.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def controller(station, connector):
|
|
connector.open()
|
|
return controllers.Streamer(station, connector)
|
|
|
|
|
|
@pytest.fixture
|
|
def socket(controller):
|
|
return controller.connector.socket
|
|
|
|
|
|
# -- metadata
|
|
@pytest.fixture
|
|
def metadata(controller):
|
|
return controllers.Metadata(controller, 1)
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata_data_air_time():
|
|
return local_tz.localize(datetime(2023, 5, 1, 12, 10, 5))
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata_data(metadata_data_air_time):
|
|
return {
|
|
"rid": 1,
|
|
"initial_uri": "request_uri",
|
|
"on_air": metadata_data_air_time.strftime("%Y/%m/%d %H:%M:%S"),
|
|
"status": "playing",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata_string(metadata_data):
|
|
return "\n".join(f"{key}={value}" for key, value in metadata_data.items()) + "\nEND"
|
|
|
|
|
|
# -- streamers
|
|
class FakeStreamer(controllers.Streamer):
|
|
calls = {}
|
|
is_ready = False
|
|
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(**kwargs)
|
|
|
|
def fetch(self):
|
|
self.calls["fetch"] = True
|
|
|
|
|
|
class FakeSource(controllers.Source):
|
|
def __init__(self, id, *args, **kwargs):
|
|
self.id = id
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.calls = {}
|
|
|
|
def fetch(self):
|
|
self.calls["sync"] = True
|
|
|
|
def sync(self):
|
|
self.calls["sync"] = True
|
|
|
|
def push(self, *path):
|
|
self.calls["push"] = path
|
|
return path
|
|
|
|
def skip(self):
|
|
self.calls["skip"] = True
|
|
|
|
def restart(self):
|
|
self.calls["restart"] = True
|
|
|
|
def seek(self, c):
|
|
self.calls["seek"] = c
|
|
|
|
|
|
class FakePlaylist(FakeSource, controllers.PlaylistSource):
|
|
pass
|
|
|
|
|
|
class FakeQueueSource(FakeSource, controllers.QueueSource):
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def streamer(station, station_ports):
|
|
streamer = FakeStreamer(station=station)
|
|
streamer.sources = [FakePlaylist(i, uri=f"source-{i}") for i in range(0, 3)]
|
|
streamer.dealer = FakeQueueSource(len(streamer.sources))
|
|
streamer.sources.append(streamer.dealer)
|
|
return streamer
|
|
|
|
|
|
@pytest.fixture
|
|
def streamers(stations, stations_ports):
|
|
streamers = controllers.Streamers(streamer_class=FakeStreamer)
|
|
# avoid unecessary db calls
|
|
streamers.streamers = {station.pk: FakeStreamer(station=station) for station in stations}
|
|
for j, streamer in enumerate(streamers.values()):
|
|
streamer.sources = [FakePlaylist(i, uri=f"source-{j}-{i}") for i in range(0, 3)]
|
|
streamer.sources.append(FakeQueueSource(len(streamer.sources)))
|
|
return streamers
|