Source code for bismuthclient.rpcconnections

"""
Bismuth default/legacy connection layer.
Json over sockets
This file is no more compatible with the Bismuth code, it's been converted to a class
EggPool 2018
"""

import json
import socket
import time
import threading

# Logical timeout
LTIMEOUT = 45
# Fixed header length
SLEN = 10


__version__ = '0.1.8'


[docs]class Connection(object): """Connection to a Bismuth Node. Handles auto reconnect when needed""" __slots__ = ('ipport', 'verbose', 'sdef', 'stats', 'last_activity', 'command_lock', 'raw') def __init__(self, ipport, verbose=False, raw=False): """ipport is an (ip, port) tuple""" self.ipport = ipport if ':' in ipport: ip, port = ipport.split(':') self.ipport = (ip, int(port)) self.verbose = verbose self.raw = raw self.sdef = None self.last_activity = 0 self.command_lock = threading.Lock() self.check_connection()
[docs] def check_connection(self): """Check connection state and reconnect if needed.""" if not self.sdef: try: if self.verbose: print("Connecting to", self.ipport) self.sdef = socket.socket() self.sdef.connect(self.ipport) self.last_activity = time.time() except Exception as e: self.sdef = None raise RuntimeError("Connections: {}".format(e))
def _send(self, data, slen=SLEN, retry=True): """Sends something to the server""" self.check_connection() try: self.sdef.settimeout(LTIMEOUT) # Make sure the packet is sent in one call sdata = str(json.dumps(data)) res = self.sdef.sendall(str(len(sdata)).encode("utf-8").zfill(slen)+sdata.encode("utf-8")) if self.raw: print("sending raw:") print(str(len(sdata)).encode("utf-8").zfill(slen)+sdata.encode("utf-8")) self.last_activity = time.time() # res is always 0 on linux if self.verbose: print("send ", data) return True except Exception as e: # send failed, try to reconnect # TODO: handle tries # self.sdef = None if retry: if self.verbose: print("Send failed ({}), trying to reconnect".format(e)) self.check_connection() else: if self.verbose: print("Send failed ({}), not retrying.".format(e)) return False try: self.sdef.settimeout(LTIMEOUT) # Make sure the packet is sent in one call self.sdef.sendall(str(len(str(json.dumps(data)))).encode("utf-8").zfill(slen)+str(json.dumps(data)).encode("utf-8")) return True except Exception as e: self.sdef = None raise RuntimeError("Connections: {}".format(e)) def _receive(self, slen=SLEN): """Wait for an answer, for LTIMEOUT sec.""" self.check_connection() self.sdef.settimeout(LTIMEOUT) if self.raw: print("getting raw:") try: data = self.sdef.recv(slen) if self.raw: raw = data if not data: raise RuntimeError("Socket EOF") data = int(data) # receive length except socket.timeout as e: self.sdef = None return "" try: chunks = [] bytes_recd = 0 while bytes_recd < data: chunk = self.sdef.recv(min(data - bytes_recd, 2048)) if not chunk: raise RuntimeError("Socket EOF2") chunks.append(chunk) bytes_recd = bytes_recd + len(chunk) self.last_activity = time.time() if self.raw: print(raw + b''.join(chunks)) segments = b''.join(chunks).decode("utf-8") return json.loads(segments) except Exception as e: """ exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) """ self.sdef = None raise RuntimeError("Connections: {}".format(e))
[docs] def command(self, command, options=None): """ Sends a command and return it's raw result. options has to be a list. Each item of options will be sent separately. So If you ant to send a list, pass a list of list. """ with self.command_lock: try: self._send(command) if options: for option in options: self._send(option, retry=False) ret = self._receive() return ret except Exception as e: """ exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) """ # TODO : better handling of tries and delay between if self.verbose: print("Error <{}> sending command, trying to reconnect.".format(e)) self.check_connection() self._send(command) if options: for option in options: self._send(option, retry=False) ret = self._receive() return ret
[docs] def close(self): """Close the socket""" try: self.sdef.close() except Exception as e: pass
if __name__ == "__main__": print("I'm a module, can't run!")