write tests for sources

This commit is contained in:
bkfox 2023-06-11 16:29:38 +02:00
parent cbd28bc946
commit b586bc5309
3 changed files with 161 additions and 56 deletions

View File

@ -76,7 +76,7 @@ class PlaylistSource(Source):
self.program = program self.program = program
super().__init__(controller, id=id, **kwargs) super().__init__(controller, id=id, **kwargs)
self.path = os.path.join(self.station.path, self.id + ".m3u") self.path = os.path.join(self.station.path, f"{self.id}.m3u")
def get_sound_queryset(self): def get_sound_queryset(self):
"""Get playlist's sounds queryset.""" """Get playlist's sounds queryset."""
@ -115,23 +115,6 @@ class QueueSource(Source):
queue = None queue = None
"""Source's queue (excluded on_air request)""" """Source's queue (excluded on_air request)"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def push(self, *paths):
"""Add the provided paths to source's play queue."""
for path in paths:
self.controller.send(self.id, "_queue.push ", path)
def fetch(self):
super().fetch()
queue = self.controller.send(self.id, "_queue.queue").strip()
if not queue:
self.queue = []
return
self.queue = queue.split(" ")
@property @property
def requests(self): def requests(self):
"""Queue as requests metadata.""" """Queue as requests metadata."""
@ -139,3 +122,20 @@ class QueueSource(Source):
for request in requests: for request in requests:
request.fetch() request.fetch()
return requests return requests
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def push(self, *paths):
"""Add the provided paths to source's play queue."""
for path in paths:
self.controller.send(f"{self.id}_queue.push {path}")
def fetch(self):
super().fetch()
queue = self.controller.send(f"{self.id}_queue.queue").strip()
if not queue:
self.queue = []
return
self.queue = queue.split(" ")

View File

@ -1,9 +1,9 @@
from datetime import datetime from datetime import datetime, time
import tzlocal import tzlocal
import pytest import pytest
from aircox.models import Station, Port from aircox import models
from aircox_streamer import controllers from aircox_streamer import controllers
from aircox_streamer.connector import Connector from aircox_streamer.connector import Connector
@ -36,7 +36,7 @@ class FakeSocket:
pass pass
def sendall(self, data): def sendall(self, data):
self.sent_data.append(data) self.sent_data.append(data.decode())
def recv(self, count): def recv(self, count):
if isinstance(self.recv_data, list): if isinstance(self.recv_data, list):
@ -48,36 +48,76 @@ class FakeSocket:
data = self.recv_data data = self.recv_data
self.recv_data = self.recv_data[count:] self.recv_data = self.recv_data[count:]
data = data[:count] data = data[:count]
return data.encode("utf-8") if isinstance(data, str) else data return (
data.encode("utf-8") if isinstance(data, str) else data
) or b"\nEND"
def is_sent(self, data):
"""Return True if provided data have been sent."""
# use [:-1] because connector add "\n" at sent data
return any(r for r in self.sent_data if r == data or r[:-1] == data)
# -- models # -- models
@pytest.fixture @pytest.fixture
def station(): def station():
station = Station(name="test", path="/tmp", default=True, active=True) station = models.Station(
name="test", path="/tmp", default=True, active=True
)
station.save() station.save()
return station return station
@pytest.fixture @pytest.fixture
def station_ports(station): def station_ports(station):
ports = [ items = [
Port( models.Port(
station=station, station=station,
direction=Port.DIRECTION_INPUT, direction=models.Port.DIRECTION_INPUT,
type=Port.TYPE_HTTP, type=models.Port.TYPE_HTTP,
active=True, active=True,
), ),
Port( models.Port(
station=station, station=station,
direction=Port.DIRECTION_OUTPUT, direction=models.Port.DIRECTION_OUTPUT,
type=Port.TYPE_FILE, type=models.Port.TYPE_FILE,
active=True, active=True,
), ),
] ]
for port in ports: models.Port.objects.bulk_create(items)
port.save() return items
return ports
@pytest.fixture
def program(station):
program = models.Program(title="test", station=station)
program.save()
return program
@pytest.fixture
def stream(program):
stream = models.Stream(
program=program, begin=time(10, 12), end=time(12, 13)
)
stream.save()
return stream
@pytest.fixture
def sounds(program):
items = [
models.Sound(
name=f"sound {i}",
program=program,
type=models.Sound.TYPE_ARCHIVE,
position=i,
file=f"sound-{i}.mp3",
)
for i in range(0, 3)
]
models.Sound.objects.bulk_create(items)
return items
# -- connectors # -- connectors

View File

@ -1,6 +1,13 @@
import os
import pytest import pytest
from aircox_streamer.controllers import Source from aircox_streamer.controllers import (
Source,
PlaylistSource,
QueueSource,
Request,
)
@pytest.fixture @pytest.fixture
@ -8,6 +15,16 @@ def source(controller):
return Source(controller, 13) return Source(controller, 13)
@pytest.fixture
def playlist_source(controller, program):
return PlaylistSource(controller, 14, program)
@pytest.fixture
def queue_source(controller):
return QueueSource(controller, 15)
class TestSource: class TestSource:
@pytest.mark.django_db @pytest.mark.django_db
def test_station(self, source, station): def test_station(self, source, station):
@ -22,60 +39,108 @@ class TestSource:
] ]
source.fetch() source.fetch()
assert f"{source.id}.remaining" in socket.sent_data assert socket.is_sent(f"{source.id}.remaining")
assert f"{source.id}.get" in socket.sent_data assert socket.is_sent(f"{source.id}.get")
assert source.remaining == remaining assert source.remaining == remaining
assert source["request_uri"] assert source.request_status
@pytest.mark.django_db @pytest.mark.django_db
def test_skip(self, socket, source): def test_skip(self, socket, source):
socket.recv_data = "\nEND"
source.skip() source.skip()
assert f"{source.id}.skip" in socket.sent_data assert socket.is_sent(f"{source.id}.skip\n")
@pytest.mark.django_db @pytest.mark.django_db
def test_restart(self, socket, source): def test_restart(self, socket, source):
source.skip() source.restart()
prefix = f"{source.id}.seek" prefix = f"{source.id}.seek"
assert any(r for r in socket.sent_data if r.startswith(prefix)) assert any(r for r in socket.sent_data if r.startswith(prefix))
@pytest.mark.django_db @pytest.mark.django_db
def test_seek(self, socket, source): def test_seek(self, socket, source):
source.seek(10) source.seek(10)
assert f"{source.id}.skip 10" in socket.sent_data assert socket.is_sent(f"{source.id}.seek 10")
class TestPlaylistSource: class TestPlaylistSource:
@pytest.mark.django_db @pytest.mark.django_db
def test_get_sound_queryset(self): def test_get_sound_queryset(self, playlist_source, sounds):
pass query = playlist_source.get_sound_queryset()
assert all(
r.program_id == playlist_source.program.pk
and r.type == r.TYPE_ARCHIVE
for r in query
)
@pytest.mark.django_db @pytest.mark.django_db
def test_get_playlist(self): def test_get_playlist(self, playlist_source, sounds):
pass expected = {r.file.path for r in sounds}
query = playlist_source.get_playlist()
assert all(r in expected for r in query)
@pytest.mark.django_db @pytest.mark.django_db
def test_write_playlist(self): def test_write_playlist(self, playlist_source):
pass playlist = ["/tmp/a", "/tmp/b"]
playlist_source.write_playlist(playlist)
with open(playlist_source.path, "r") as file:
result = file.read()
os.remove(playlist_source.path)
assert result == "\n".join(playlist)
@pytest.mark.django_db @pytest.mark.django_db
def test_stream(self): def test_stream(self, playlist_source, stream):
pass result = playlist_source.stream()
assert result == {
"begin": stream.begin.strftime("%Hh%M"),
"end": stream.end.strftime("%Hh%M"),
"delay": 0,
}
@pytest.mark.django_db @pytest.mark.django_db
def test_sync(self): def test_sync(self, playlist_source):
pass # spoof method
playlist = ["/tmp/a", "/tmp/b"]
written_playlist = []
playlist_source.get_playlist = lambda: playlist
playlist_source.write_playlist = lambda p: written_playlist.extend(p)
playlist_source.sync()
assert written_playlist == playlist
class TestQueueSource: class TestQueueSource:
@pytest.mark.django_db @pytest.mark.django_db
def test_push(self): def test_requests(self, queue_source, socket, metadata_string):
pass queue_source.queue = [13, 14, 15]
socket.recv_data = [
f"{metadata_string}\nEND" for _ in queue_source.queue
]
requests = queue_source.requests
assert all(isinstance(r, Request) for r in requests)
assert all(r.uri for r in requests)
@pytest.mark.django_db @pytest.mark.django_db
def test_fetch(self): def test_push(self, queue_source, socket):
pass paths = ["/tmp/a", "/tmp/b"]
queue_source.push(*paths)
assert all(
socket.is_sent(f"{queue_source.id}_queue.push {path}")
for path in paths
)
@pytest.mark.django_db @pytest.mark.django_db
def test_requests(self): def test_fetch(self, queue_source, socket, metadata_string):
pass queue = ["13", "14", "15"]
socket.recv_data = [
# Source fetch remaining & metadata
"13 END",
metadata_string,
" ".join(queue) + "\nEND",
]
queue_source.fetch()
assert queue_source.uri
assert queue_source.queue == queue