117 lines
3.9 KiB
Python
117 lines
3.9 KiB
Python
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from aircox_streamer.connector import Connector
|
|
|
|
|
|
class TestConnector:
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_open_unix(self, connector_unix):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_unix.open()
|
|
assert connector_unix.is_open
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_open_tcpip(self, connector_inet):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_inet.open()
|
|
assert connector_inet.is_open
|
|
|
|
@pytest.mark.django_db
|
|
def test_init_with_unix_address(self, connector_unix):
|
|
assert connector_unix.address == "path/to/socket"
|
|
|
|
@pytest.mark.django_db
|
|
def test_init_with_tcpip_address(self, connector_inet):
|
|
assert connector_inet.address == ('0.0.0.0', 6767)
|
|
|
|
@pytest.mark.django_db
|
|
def test_init_without_address(self, connector):
|
|
connector = Connector()
|
|
assert connector.address is None
|
|
|
|
#TODO Check if the mock_socket was define with the right arguments (socket.socket(family, socket.SOCK_STREAM))
|
|
@pytest.mark.django_db
|
|
def test_open_unix(self, connector_unix):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_unix.open()
|
|
connector_unix.socket.connect.assert_called_with("path/to/socket")
|
|
|
|
@pytest.mark.django_db
|
|
def test_open_tcpip(self, connector_inet):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_inet.open()
|
|
connector_inet.socket.connect.assert_called_with(('0.0.0.0', 6767))
|
|
|
|
@pytest.mark.django_db
|
|
def test_open_with_exeption(self, connector):
|
|
connector.address = 123
|
|
assert connector.open() == -1
|
|
|
|
@pytest.mark.django_db
|
|
def test_open_unix_when_isopen(self, connector_unix):
|
|
connector_unix.is_open == True
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
assert connector_unix.open() == None
|
|
|
|
@pytest.mark.django_db
|
|
def test_open_tcpip_when_isopen(self, connector_inet):
|
|
connector_inet.is_open == True
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
assert connector_inet.open() == None
|
|
|
|
@pytest.mark.django_db
|
|
def test_close_unix(self, connector_unix):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_unix.open()
|
|
connector_unix.close()
|
|
assert connector_unix.socket == None
|
|
|
|
@pytest.mark.django_db
|
|
def test_close_tcpip(self, connector_inet):
|
|
mock_socket = MagicMock()
|
|
with patch('aircox_streamer.connector.socket', mock_socket):
|
|
connector_inet.open()
|
|
connector_inet.close()
|
|
assert connector_inet.socket == None
|
|
|
|
#issue, don't succeed to fake a response. Pytest always fail by being stuck at the while loop.
|
|
@pytest.mark.django_db
|
|
def test_send_tcpip(self, connector_inet):
|
|
pass
|
|
|
|
@pytest.mark.django_db
|
|
def test_parse(self, connector):
|
|
input_data = 'key1=value1\nkey2=value2\nkey3=value3'
|
|
expected_output = {
|
|
'key1': 'value1',
|
|
'key2': 'value2',
|
|
'key3': 'value3'
|
|
}
|
|
assert connector.parse(input_data) == expected_output
|
|
|
|
@pytest.mark.django_db
|
|
def test_parse_json(self, connector):
|
|
input_data = '{"key1": "value1", "key2": "value2", "key3": "value3"}'
|
|
expected_output = {
|
|
'key1': 'value1',
|
|
'key2': 'value2',
|
|
'key3': 'value3'
|
|
}
|
|
assert connector.parse_json(input_data) == expected_output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|