write tests for streamer

This commit is contained in:
bkfox 2023-06-12 14:09:06 +02:00
parent b586bc5309
commit 3d76b656d2
11 changed files with 254 additions and 41 deletions

View File

@ -49,7 +49,6 @@ class Connector:
socket.AF_UNIX if isinstance(self.address, str) else socket.AF_INET
)
try:
print("-----", self.address)
self.socket = self.socket_class(family, socket.SOCK_STREAM)
self.socket.connect(self.address)
return 0

View File

@ -173,7 +173,7 @@ class Streamer:
self.check_zombie_process()
self.process = subprocess.Popen(args, stderr=subprocess.STDOUT)
atexit.register(lambda: self.kill_process())
atexit.register(self.kill_process)
def kill_process(self):
if self.process:
@ -184,6 +184,7 @@ class Streamer:
)
self.process.kill()
self.process = None
atexit.unregister(self.kill_process)
def wait_process(self):
"""Wait for the process to terminate if there is a process."""

View File

View File

@ -1,3 +1,5 @@
import os
from datetime import datetime, time
import tzlocal
@ -11,6 +13,9 @@ from aircox_streamer.connector import Connector
local_tz = tzlocal.get_localzone()
working_dir = os.path.join(os.path.dirname(__file__), "working_dir")
class FakeSocket:
FAILING_ADDRESS = -1
"""Connect with this address fails."""
@ -62,7 +67,7 @@ class FakeSocket:
@pytest.fixture
def station():
station = models.Station(
name="test", path="/tmp", default=True, active=True
name="test", path=working_dir, default=True, active=True
)
station.save()
return station
@ -123,7 +128,7 @@ def sounds(program):
# -- connectors
@pytest.fixture
def connector():
obj = Connector("/tmp/test.sock")
obj = Connector(os.path.join(working_dir, "test.sock"))
obj.socket_class = FakeSocket
yield obj
obj.close()

View File

@ -0,0 +1,39 @@
import atexit as o_atexit
import subprocess as o_subprocess
import psutil as o_psutil
from . import atexit, subprocess, psutil
modules = [
(o_atexit, atexit, {}),
(o_subprocess, subprocess, {}),
(o_psutil, psutil, {}),
]
def init_mappings():
for original, spoof, mapping in modules:
if mapping:
continue
mapping.update(
{
attr: (getattr(original, attr, None), spoofed)
for attr, spoofed in vars(spoof).items()
if not attr.startswith("_") and hasattr(original, attr)
}
)
def setup():
for original, spoof, mappings in modules:
for attr, (orig, spoofed) in mappings.items():
setattr(original, attr, spoofed)
def setdown():
for original, spoof, mappings in modules:
for attr, (orig, spoofed) in mappings.items():
setattr(original, attr, orig)
init_mappings()

View File

@ -0,0 +1,10 @@
registered = []
"""Items registered by register()"""
def register(func, *args, **kwargs):
registered.append(func)
def unregister(func):
registered.remove(func)

View File

@ -0,0 +1,15 @@
"""Spoof psutil module in order to run and check tests."""
class FakeNetConnection:
def __init__(self, laddr, pid=None):
self.laddr = laddr
self.pid = pid
def net_connections(*args, **kwargs):
return net_connections.result
net_connections.result = []
"""Result value of net_connections call."""

View File

@ -0,0 +1,39 @@
"""Spoof psutil module in order to run and check tests Resulting values of
method calls are set inside `fixtures` module."""
STDOUT = 1
STDERR = 2
STDIN = 3
class FakeProcess:
args = None
kwargs = None
"""Kwargs passed to Popen."""
killed = False
"""kill() have been called."""
waited = False
"""wait() have been called."""
polled = False
"""poll() have been called."""
poll_result = None
"""Result of poll() method."""
def __init__(self, args=[], kwargs={}):
self.pid = -13
self.args = args
self.kwargs = kwargs
def kill(self):
self.killed = True
def wait(self):
self.waited = True
def poll(self):
self.polled = True
return self.poll_result
def Popen(args, **kwargs):
return FakeProcess(args, kwargs)

View File

@ -1,6 +1,9 @@
import json
import os
import socket
from .conftest import working_dir
class TestConnector:
payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END"
@ -13,7 +16,9 @@ class TestConnector:
assert connector.is_open
assert connector.socket.family == socket.AF_UNIX
assert connector.socket.type == socket.SOCK_STREAM
assert connector.socket.address == "/tmp/test.sock"
assert connector.socket.address == os.path.join(
working_dir, "test.sock"
)
connector.close()
def test_open_af_inet(self, connector):

View File

@ -1,50 +1,150 @@
# import pytest
import os
# from aircox_streamer import controllers
import pytest
from aircox_streamer import controllers
from . import fake_modules
from .fake_modules import atexit, subprocess, psutil
class FakeSource:
synced = False
def sync(self):
self.synced = True
@pytest.fixture
def streamer(station, connector, station_ports, stream):
fake_modules.setup()
streamer = controllers.Streamer(station, connector)
psutil.net_connections.result = [
psutil.FakeNetConnection(streamer.socket_path, None),
]
yield streamer
fake_modules.setdown()
class TestStreamer:
def test_socket_path(self):
@pytest.mark.django_db
def test_socket_path(self, streamer):
assert streamer.socket_path == streamer.connector.address
@pytest.mark.django_db
def test_is_ready(self, streamer, socket):
socket.recv_data = "item 1\nEND"
assert streamer.is_ready
@pytest.mark.django_db
def test_is_ready_false(self, streamer, socket):
socket.recv_data = ""
assert not streamer.is_ready
@pytest.mark.django_db
def test_is_running(self, streamer):
streamer.process = subprocess.FakeProcess()
streamer.process.poll_result = None
assert streamer.is_running
@pytest.mark.django_db
def test_is_running_no_process(self, streamer):
streamer.process = None
assert not streamer.is_running
@pytest.mark.django_db
def test_is_running_process_died(self, streamer):
process = subprocess.FakeProcess()
process.poll_result = 1
streamer.process = process
assert not streamer.is_running
assert streamer.process is None
assert process.polled
@pytest.mark.django_db
def test_playlists(self, streamer, program):
result = list(streamer.playlists)
assert len(result) == 1
result = result[0]
assert isinstance(result, controllers.PlaylistSource)
assert result.program == program
@pytest.mark.django_db
def test_queues(self, streamer):
result = list(streamer.queues)
assert len(result) == 1
assert result[0] == streamer.dealer
@pytest.mark.django_db
def test_init_sources(self, streamer, program):
streamer.init_sources()
assert isinstance(streamer.dealer, controllers.QueueSource)
# one for dealer, one for program
assert len(streamer.sources) == 2
assert streamer.sources[1].program == program
@pytest.mark.django_db
def test_make_config(self, streamer):
streamer.make_config()
assert os.path.exists(streamer.path)
@pytest.mark.django_db
def test_sync(self, streamer):
streamer.sources = [FakeSource(), FakeSource()]
streamer.sync()
assert all(source.synced for source in streamer.sources)
@pytest.mark.django_db
def test_fetch(self, streamer):
pass
def test_is_ready(self):
pass
@pytest.mark.django_db
def test_get_process_args(self, streamer):
assert streamer.get_process_args() == [
"liquidsoap",
"-v",
streamer.path,
]
def test_is_running(self):
pass
@pytest.mark.django_db
def test_check_zombie_process(self, streamer):
with open(streamer.socket_path, "w+") as file:
file.write("data")
# This test is incomplete, but we can not go further because os module
# is not spoofed (too much work) to check if os.kill is called.
streamer.check_zombie_process()
def test_playlists(self):
pass
@pytest.mark.django_db
def test_check_zombie_process_no_socket(self, streamer):
if os.path.exists(streamer.socket_path):
os.remove(streamer.socket_path)
streamer.check_zombie_process()
def test_queues(self):
pass
@pytest.mark.django_db
def test_run_process(self, streamer):
if os.path.exists(streamer.socket_path):
os.remove(streamer.socket_path)
streamer.run_process()
process = streamer.process
def test_send(self):
pass
assert process.args == streamer.get_process_args()
assert streamer.kill_process in atexit.registered
def test_init_sources(self):
pass
@pytest.mark.django_db
def test_kill_process(self, streamer):
streamer.run_process()
process = streamer.process
streamer.kill_process()
def test_make_config(self):
pass
assert process.killed
assert streamer.process is None
assert streamer.kill_process not in atexit.registered
def test_sync(self):
pass
@pytest.mark.django_db
def test_wait_process(self, streamer):
process = subprocess.FakeProcess()
streamer.process = process
streamer.wait_process()
def test_fetch(self):
pass
def test_get_process_args(self):
pass
def test_check_zombie_process(self):
pass
def test_run_process(self):
pass
def test_kill_process(self):
pass
def test_wait_process(self):
pass
assert process.waited
assert streamer.process is None