cfr #121 Co-authored-by: Christophe Siraut <d@tobald.eu.org> Co-authored-by: bkfox <thomas bkfox net> Co-authored-by: Thomas Kairos <thomas@bkfox.net> Reviewed-on: #131 Co-authored-by: Chris Tactic <ctactic@noreply.git.radiocampus.be> Co-committed-by: Chris Tactic <ctactic@noreply.git.radiocampus.be>
		
			
				
	
	
		
			100 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			100 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
import json
 | 
						|
import re
 | 
						|
import socket
 | 
						|
 | 
						|
response_re = re.compile(r"(.*)\s+END\s*$")
 | 
						|
key_val_re = re.compile(r'(?P<key>[^=]+)="?(?P<value>([^"]|\\")+)"?')
 | 
						|
 | 
						|
 | 
						|
class Connector:
 | 
						|
    """Connection to AF_UNIX or AF_INET, get and send data.
 | 
						|
 | 
						|
    Received data can be parsed from list of `key=value` or JSON.
 | 
						|
    """
 | 
						|
 | 
						|
    socket_class = socket.socket
 | 
						|
    """Socket class to instanciate on open."""
 | 
						|
    socket = None
 | 
						|
    """The socket."""
 | 
						|
    address = None
 | 
						|
    """String to a Unix domain socket file, or a tuple (host, port) for TCP/IP
 | 
						|
    connection."""
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_open(self):
 | 
						|
        return self.socket is not None
 | 
						|
 | 
						|
    def __init__(self, address=None):
 | 
						|
        if address:
 | 
						|
            self.address = address
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        r = self.open()
 | 
						|
        if r == -1:
 | 
						|
            raise RuntimeError("can not open socket.")
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self):
 | 
						|
        self.close()
 | 
						|
 | 
						|
    def open(self):
 | 
						|
        """Open connection.
 | 
						|
 | 
						|
        :return: 0 (success), 1 (already opened), -1 (failure)
 | 
						|
        """
 | 
						|
        if self.is_open:
 | 
						|
            return 1
 | 
						|
 | 
						|
        family = socket.AF_UNIX if isinstance(self.address, str) else socket.AF_INET
 | 
						|
        try:
 | 
						|
            self.socket = self.socket_class(family, socket.SOCK_STREAM)
 | 
						|
            self.socket.connect(self.address)
 | 
						|
            return 0
 | 
						|
        except Exception:
 | 
						|
            import traceback
 | 
						|
 | 
						|
            traceback.print_exc()
 | 
						|
            self.close()
 | 
						|
            return -1
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self.is_open:
 | 
						|
            self.socket.close()
 | 
						|
            self.socket = None
 | 
						|
 | 
						|
    # FIXME: return None on failed
 | 
						|
    def send(self, *data, try_count=1, parse=False, parse_json=False):
 | 
						|
        if self.open() == -1:
 | 
						|
            return None
 | 
						|
 | 
						|
        data = bytes("".join([str(d) for d in data]) + "\n", encoding="utf-8")
 | 
						|
        try:
 | 
						|
            self.socket.sendall(data)
 | 
						|
            resp = ""
 | 
						|
            while not response_re.search(resp):
 | 
						|
                resp += self.socket.recv(1024).decode("utf-8")
 | 
						|
 | 
						|
            if resp:
 | 
						|
                resp = response_re.sub(r"\1", resp).strip()
 | 
						|
                resp = self.parse(resp) if parse else self.parse_json(resp) if parse_json else resp
 | 
						|
            return resp
 | 
						|
        except Exception:
 | 
						|
            self.close()
 | 
						|
            if try_count > 0:
 | 
						|
                return self.send(data, try_count - 1)
 | 
						|
 | 
						|
    def parse(self, value):
 | 
						|
        return {
 | 
						|
            line.groupdict()["key"]: line.groupdict()["value"]
 | 
						|
            for line in (key_val_re.search(line) for line in value.split("\n"))
 | 
						|
            if line
 | 
						|
        }
 | 
						|
 | 
						|
    def parse_json(self, value):
 | 
						|
        try:
 | 
						|
            if value[0] == '"' and value[-1] == '"':
 | 
						|
                value = value[1:-1]
 | 
						|
            return json.loads(value) if value else None
 | 
						|
        except Exception:
 | 
						|
            return None
 |