import os import pytest from aircox_streamer import controllers from . import fake_modules from .fake_modules import atexit, subprocess, psutil class FakeSource: synced = False def sync(self): self.synced = True @pytest.fixture def streamer(station, connector, station_ports, stream): fake_modules.setup() streamer = controllers.Streamer(station, connector) psutil.net_connections.result = [ psutil.FakeNetConnection(streamer.socket_path, None), ] yield streamer fake_modules.setdown() class TestStreamer: @pytest.mark.django_db def test_socket_path(self, streamer): assert streamer.socket_path == streamer.connector.address @pytest.mark.django_db def test_is_ready(self, streamer, socket): socket.recv_data = "item 1\nEND" assert streamer.is_ready @pytest.mark.django_db def test_is_ready_false(self, streamer, socket): socket.recv_data = "" assert not streamer.is_ready @pytest.mark.django_db def test_is_running(self, streamer): streamer.process = subprocess.FakeProcess() streamer.process.poll_result = None assert streamer.is_running @pytest.mark.django_db def test_is_running_no_process(self, streamer): streamer.process = None assert not streamer.is_running @pytest.mark.django_db def test_is_running_process_died(self, streamer): process = subprocess.FakeProcess() process.poll_result = 1 streamer.process = process assert not streamer.is_running assert streamer.process is None assert process.polled @pytest.mark.django_db def test_playlists(self, streamer, program): result = list(streamer.playlists) assert len(result) == 1 result = result[0] assert isinstance(result, controllers.PlaylistSource) assert result.program == program @pytest.mark.django_db def test_queues(self, streamer): result = list(streamer.queues) assert len(result) == 1 assert result[0] == streamer.dealer @pytest.mark.django_db def test_init_sources(self, streamer, program): streamer.init_sources() assert isinstance(streamer.dealer, controllers.QueueSource) # one for dealer, one for program assert len(streamer.sources) == 2 assert streamer.sources[1].program == program @pytest.mark.django_db def test_make_config(self, streamer): streamer.make_config() assert os.path.exists(streamer.path) @pytest.mark.django_db def test_sync(self, streamer): streamer.sources = [FakeSource(), FakeSource()] streamer.sync() assert all(source.synced for source in streamer.sources) @pytest.mark.django_db def test_fetch(self, streamer): pass @pytest.mark.django_db def test_get_process_args(self, streamer): assert streamer.get_process_args() == [ "liquidsoap", "-v", streamer.path, ] @pytest.mark.django_db def test_check_zombie_process(self, streamer): with open(streamer.socket_path, "w+") as file: file.write("data") # This test is incomplete, but we can not go further because os module # is not spoofed (too much work) to check if os.kill is called. streamer.check_zombie_process() @pytest.mark.django_db def test_check_zombie_process_no_socket(self, streamer): if os.path.exists(streamer.socket_path): os.remove(streamer.socket_path) streamer.check_zombie_process() @pytest.mark.django_db def test_run_process(self, streamer): if os.path.exists(streamer.socket_path): os.remove(streamer.socket_path) streamer.run_process() process = streamer.process assert process.args == streamer.get_process_args() assert streamer.kill_process in atexit.registered @pytest.mark.django_db def test_kill_process(self, streamer): streamer.run_process() process = streamer.process streamer.kill_process() assert process.killed assert streamer.process is None assert streamer.kill_process not in atexit.registered @pytest.mark.django_db def test_wait_process(self, streamer): process = subprocess.FakeProcess() streamer.process = process streamer.wait_process() assert process.waited assert streamer.process is None