aircox/aircox_streamer/tests/test_controllers_streamer.py
Thomas Kairos b453c821c7 #106: tests: aircox_streamer (#110)
- Writes tests for aircox streamer application;
- Add test utilities in aircox

Co-authored-by: bkfox <thomas bkfox net>
Reviewed-on: #110
2023-06-18 17:00:08 +02:00

151 lines
4.4 KiB
Python

import os
import pytest
from aircox_streamer import controllers
from . import fake_modules
from .fake_modules import atexit, subprocess, psutil
class FakeSource:
synced = False
def sync(self):
self.synced = True
@pytest.fixture
def streamer(station, connector, station_ports, stream):
fake_modules.setup()
streamer = controllers.Streamer(station, connector)
psutil.net_connections.result = [
psutil.FakeNetConnection(streamer.socket_path, None),
]
yield streamer
fake_modules.setdown()
class TestStreamer:
@pytest.mark.django_db
def test_socket_path(self, streamer):
assert streamer.socket_path == streamer.connector.address
@pytest.mark.django_db
def test_is_ready(self, streamer, socket):
socket.recv_data = "item 1\nEND"
assert streamer.is_ready
@pytest.mark.django_db
def test_is_ready_false(self, streamer, socket):
socket.recv_data = ""
assert not streamer.is_ready
@pytest.mark.django_db
def test_is_running(self, streamer):
streamer.process = subprocess.FakeProcess()
streamer.process.poll_result = None
assert streamer.is_running
@pytest.mark.django_db
def test_is_running_no_process(self, streamer):
streamer.process = None
assert not streamer.is_running
@pytest.mark.django_db
def test_is_running_process_died(self, streamer):
process = subprocess.FakeProcess()
process.poll_result = 1
streamer.process = process
assert not streamer.is_running
assert streamer.process is None
assert process.polled
@pytest.mark.django_db
def test_playlists(self, streamer, program):
result = list(streamer.playlists)
assert len(result) == 1
result = result[0]
assert isinstance(result, controllers.PlaylistSource)
assert result.program == program
@pytest.mark.django_db
def test_queues(self, streamer):
result = list(streamer.queues)
assert len(result) == 1
assert result[0] == streamer.dealer
@pytest.mark.django_db
def test_init_sources(self, streamer, program):
streamer.init_sources()
assert isinstance(streamer.dealer, controllers.QueueSource)
# one for dealer, one for program
assert len(streamer.sources) == 2
assert streamer.sources[1].program == program
@pytest.mark.django_db
def test_make_config(self, streamer):
streamer.make_config()
assert os.path.exists(streamer.path)
@pytest.mark.django_db
def test_sync(self, streamer):
streamer.sources = [FakeSource(), FakeSource()]
streamer.sync()
assert all(source.synced for source in streamer.sources)
@pytest.mark.django_db
def test_fetch(self, streamer):
pass
@pytest.mark.django_db
def test_get_process_args(self, streamer):
assert streamer.get_process_args() == [
"liquidsoap",
"-v",
streamer.path,
]
@pytest.mark.django_db
def test_check_zombie_process(self, streamer):
with open(streamer.socket_path, "w+") as file:
file.write("data")
# This test is incomplete, but we can not go further because os module
# is not spoofed (too much work) to check if os.kill is called.
streamer.check_zombie_process()
@pytest.mark.django_db
def test_check_zombie_process_no_socket(self, streamer):
if os.path.exists(streamer.socket_path):
os.remove(streamer.socket_path)
streamer.check_zombie_process()
@pytest.mark.django_db
def test_run_process(self, streamer):
if os.path.exists(streamer.socket_path):
os.remove(streamer.socket_path)
streamer.run_process()
process = streamer.process
assert process.args == streamer.get_process_args()
assert streamer.kill_process in atexit.registered
@pytest.mark.django_db
def test_kill_process(self, streamer):
streamer.run_process()
process = streamer.process
streamer.kill_process()
assert process.killed
assert streamer.process is None
assert streamer.kill_process not in atexit.registered
@pytest.mark.django_db
def test_wait_process(self, streamer):
process = subprocess.FakeProcess()
streamer.process = process
streamer.wait_process()
assert process.waited
assert streamer.process is None