stash
This commit is contained in:
51
aircox_streamer/tests/conftest.py
Normal file
51
aircox_streamer/tests/conftest.py
Normal file
@ -0,0 +1,51 @@
|
||||
import pytest
|
||||
|
||||
from aircox_streamer import connector
|
||||
|
||||
|
||||
class FakeSocket:
|
||||
FAILING_ADDRESS = -1
|
||||
"""Connect with this address fails."""
|
||||
|
||||
family, type, address = None, None, None
|
||||
sent_data = None
|
||||
"""List of data that have been `send[all]`"""
|
||||
recv_data = None
|
||||
"""Response data to return on recv."""
|
||||
|
||||
def __init__(self, family, type):
|
||||
self.family = family
|
||||
self.type = type
|
||||
self.sent_data = []
|
||||
|
||||
def connect(self, address):
|
||||
if address == self.FAILING_ADDRESS:
|
||||
raise RuntimeError("invalid connection")
|
||||
self.address = address
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def sendall(self, data):
|
||||
self.sent_data.append(data)
|
||||
|
||||
def recv(self, count):
|
||||
data = self.recv_data[:count]
|
||||
self.recv_data = data[count:]
|
||||
return data.encode("utf-8") if isinstance(data, str) else data
|
||||
|
||||
|
||||
class Connector(connector.Connector):
|
||||
socket_class = FakeSocket
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def connector(request):
|
||||
obj = Connector("test")
|
||||
yield obj
|
||||
obj.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fail_connector():
|
||||
return Connector(FakeSocket.FAILING_ADDRESS)
|
67
aircox_streamer/tests/test_connector.py
Normal file
67
aircox_streamer/tests/test_connector.py
Normal file
@ -0,0 +1,67 @@
|
||||
import json
|
||||
import socket
|
||||
|
||||
|
||||
from aircox_streamer.connector import Connector
|
||||
|
||||
|
||||
class TestConnector:
|
||||
payload = "non_value_info\n" 'a="value_1"\n' 'b="value_b"\n' "END"
|
||||
"""Test payload."""
|
||||
payload_data = {"a": "value_1", "b": "value_b"}
|
||||
"""Resulting data of payload."""
|
||||
|
||||
def test_open(self, connector):
|
||||
assert connector.open() == 0
|
||||
assert connector.is_open
|
||||
assert connector.socket.family == socket.AF_UNIX
|
||||
assert connector.socket.type == socket.SOCK_STREAM
|
||||
assert connector.socket.address == "test"
|
||||
connector.close()
|
||||
|
||||
def test_open_af_inet(self):
|
||||
address = ("test", 30)
|
||||
connector = Connector(address)
|
||||
assert connector.open() == 0
|
||||
assert connector.is_open
|
||||
assert connector.socket.family == socket.AF_INET
|
||||
assert connector.socket.type == socket.SOCK_STREAM
|
||||
assert connector.socket.address == address
|
||||
|
||||
def test_open_is_already_open(self, connector):
|
||||
connector.open()
|
||||
assert connector.open() == 1
|
||||
|
||||
def test_open_failure(self, fail_connector):
|
||||
assert fail_connector.open() == -1
|
||||
assert fail_connector.socket is None # close() called
|
||||
|
||||
def test_close(self, connector):
|
||||
connector.open()
|
||||
assert connector.socket is not None
|
||||
connector.close()
|
||||
assert connector.socket is None
|
||||
|
||||
def test_send(self, connector):
|
||||
connector.socket.recv_data = self.payload
|
||||
result = connector.send("fake_action", parse=True)
|
||||
assert result == self.payload_data
|
||||
|
||||
def test_send_open_failure(self, fail_connector):
|
||||
assert fail_connector.send("fake_action", parse=True) is None
|
||||
|
||||
def test_parse(self, connector):
|
||||
result = connector.parse(self.payload)
|
||||
assert result == self.payload_data
|
||||
|
||||
def test_parse_json(self, connector):
|
||||
# include case where json string is surrounded by '"'
|
||||
dumps = '"' + json.dumps(self.payload_data) + '"'
|
||||
result = connector.parse_json(dumps)
|
||||
assert result == self.payload_data
|
||||
|
||||
def test_parse_json_empty_value(self, connector):
|
||||
assert connector.parse_json('""') is None
|
||||
|
||||
def test_parse_json_failure(self, connector):
|
||||
assert connector.parse_json("-- invalid json string --") is None
|
35
aircox_streamer/tests/test_controllers_base.py
Normal file
35
aircox_streamer/tests/test_controllers_base.py
Normal file
@ -0,0 +1,35 @@
|
||||
# import pytest
|
||||
|
||||
# from aircox_streamer import controllers
|
||||
|
||||
|
||||
class TestBaseMetaData:
|
||||
def test_is_playing(self):
|
||||
pass
|
||||
|
||||
def test_status_verbose(self):
|
||||
pass
|
||||
|
||||
def test_fetch(self):
|
||||
pass
|
||||
|
||||
def test_fetch_no_data(self):
|
||||
pass
|
||||
|
||||
def test_validate_status_playing(self):
|
||||
pass
|
||||
|
||||
def test_validate_status_paused(self):
|
||||
pass
|
||||
|
||||
def test_validate_status_stopped(self):
|
||||
pass
|
||||
|
||||
def test_validate_air_time(self):
|
||||
pass
|
||||
|
||||
def test_validate_air_time_none(self):
|
||||
pass
|
||||
|
||||
def test_validate(self):
|
||||
pass
|
51
aircox_streamer/tests/test_controllers_sources.py
Normal file
51
aircox_streamer/tests/test_controllers_sources.py
Normal file
@ -0,0 +1,51 @@
|
||||
# import pytest
|
||||
|
||||
# from aircox_streamer import controllers
|
||||
|
||||
|
||||
class TestSource:
|
||||
def test_station(self):
|
||||
pass
|
||||
|
||||
def test_sync(self):
|
||||
pass
|
||||
|
||||
def test_fetch(self):
|
||||
pass
|
||||
|
||||
def test_skip(self):
|
||||
pass
|
||||
|
||||
def test_restart(self):
|
||||
pass
|
||||
|
||||
def test_seek(self, n):
|
||||
pass
|
||||
|
||||
|
||||
class TestPlaylistSource:
|
||||
def test_get_sound_queryset(self):
|
||||
pass
|
||||
|
||||
def test_get_playlist(self):
|
||||
pass
|
||||
|
||||
def test_write_playlist(self):
|
||||
pass
|
||||
|
||||
def test_stream(self):
|
||||
pass
|
||||
|
||||
def test_sync(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestQueueSource:
|
||||
def test_push(self):
|
||||
pass
|
||||
|
||||
def test_fetch(self):
|
||||
pass
|
||||
|
||||
def test_requests(self):
|
||||
pass
|
50
aircox_streamer/tests/test_controllers_streamer.py
Normal file
50
aircox_streamer/tests/test_controllers_streamer.py
Normal file
@ -0,0 +1,50 @@
|
||||
# import pytest
|
||||
|
||||
# from aircox_streamer import controllers
|
||||
|
||||
|
||||
class TestStreamer:
|
||||
def test_socket_path(self):
|
||||
pass
|
||||
|
||||
def test_is_ready(self):
|
||||
pass
|
||||
|
||||
def test_is_running(self):
|
||||
pass
|
||||
|
||||
def test_playlists(self):
|
||||
pass
|
||||
|
||||
def test_queues(self):
|
||||
pass
|
||||
|
||||
def test_send(self):
|
||||
pass
|
||||
|
||||
def test_init_sources(self):
|
||||
pass
|
||||
|
||||
def test_make_config(self):
|
||||
pass
|
||||
|
||||
def test_sync(self):
|
||||
pass
|
||||
|
||||
def test_fetch(self):
|
||||
pass
|
||||
|
||||
def test_get_process_args(self):
|
||||
pass
|
||||
|
||||
def test_check_zombie_process(self):
|
||||
pass
|
||||
|
||||
def test_run_process(self):
|
||||
pass
|
||||
|
||||
def test_kill_process(self):
|
||||
pass
|
||||
|
||||
def test_wait_process(self):
|
||||
pass
|
Reference in New Issue
Block a user