qulab package

The best place to start is the examples folder before diving into the API.

qulab.dht package

This package is developed base on [kademlia](https://github.com/bmuller/kademlia)

Kademlia is a Python implementation of the Kademlia protocol which utilizes the asyncio library.

qulab.math package

qulab.storage module

class qulab.storage.memstorage.ForgetfulStorage(ttl=604800)[source]

Bases: qulab.storage.memstorage.IStorage

cull()[source]
get(key, default=None)[source]

Get given key. If not found, return default.

iter_older_than(seconds_old)[source]

Return the an iterator over (key, value) tuples for items older than the given secondsOld.

class qulab.storage.memstorage.IStorage[source]

Bases: abc.ABC

Local storage for this node. IStorage implementations of get must return the same type as put in by set

abstract get(key, default=None)[source]

Get given key. If not found, return default.

abstract iter_older_than(seconds_old)[source]

Return the an iterator over (key, value) tuples for items older than the given secondsOld.

qulab.exceptions module

exception qulab.exceptions.QuLabDHTMalformedMessage[source]

Bases: qulab.exceptions.QuLabException

Message does not contain what is expected.

exception qulab.exceptions.QuLabException[source]

Bases: Exception

Base exception.

exception qulab.exceptions.QuLabRPCError[source]

Bases: qulab.exceptions.QuLabException

RPC base exception.

exception qulab.exceptions.QuLabRPCServerError[source]

Bases: qulab.exceptions.QuLabRPCError

Server side error.

classmethod make(exce)[source]
exception qulab.exceptions.QuLabRPCTimeout[source]

Bases: qulab.exceptions.QuLabRPCError

Timeout.

qulab.log module

class qulab.log.BaseHandler[source]

Bases: logging.Handler

emit(record)[source]

Emit a log message.

send_bytes(bmsg)[source]
serialize(record)[source]

Serialize the record in binary format, and returns it ready for transmission across the socket.

class qulab.log.RedisHandler(conn, channel='log')[source]

Bases: qulab.log.BaseHandler

Publish log by redis

send_bytes(bmsg)[source]
class qulab.log.ZMQHandler(socket: zmq.sugar.socket.Socket)[source]

Bases: qulab.log.BaseHandler

Publish log by zmq socket

send_bytes(bmsg)[source]
qulab.log.level()[source]

Get default log level

qulab.rpc module

class qulab.rpc.RPCClientMixin[source]

Bases: qulab.rpc.RPCMixin

on_response(source, data)[source]

Client side.

remoteCall(addr, methodNane, args=(), kw=None)[source]
set_timeout(timeout=10)[source]
class qulab.rpc.RPCMixin[source]

Bases: abc.ABC

cancelPending(addr, msgID, cancelRemote)[source]

Give up when request timeout and try to cancel remote task.

cancelRemoteTask(addr, msgID)[source]

Try to cancel remote task.

cancelTask(msgID)[source]

Cancel the task for msgID.

close()[source]
createPending(addr, msgID, timeout=1, cancelRemote=True)[source]

Create a future for request, wait response before timeout.

createTask(msgID, coro, timeout=0)[source]

Create a new task for msgID.

handle(source, data)[source]

Handle received data.

Should be called whenever received data from outside.

is_admin(source, data)[source]
abstract property loop

Event loop.

on_cancel(source, data)[source]
on_ping(source, data)[source]
on_pong(source, data)[source]
on_request(source, data)[source]

Handle request.

Overwrite this method on server.

on_response(source, data)[source]

Handle response.

Overwrite this method on client.

on_shutdown(source, data)[source]
property pending
async ping(addr, timeout=1)[source]
async pong(addr)[source]
async request(address, msgID, msg)[source]
async response(address, msgID, msg)[source]
abstract async sendto(data, address)[source]

Send message to address.

async shutdown(address)[source]
start()[source]
stop()[source]
property tasks
class qulab.rpc.RPCServerMixin[source]

Bases: qulab.rpc.RPCMixin

property executor
abstract getRequestHandler(methodNane, source, msgID)[source]

Get suitable handler for request.

You should implement this method yourself.

async handle_request(source, msgID, method, *args, **kw)[source]

Handle a request from source.

on_request(source, data)[source]

Received a request from source.

class qulab.rpc.ZMQClient(addr, timeout=10, loop=None)[source]

Bases: qulab.rpc.RPCClientMixin

property loop

Event loop.

performMethod(methodNane, args, kw)[source]
async ping(timeout=1)[source]
async run()[source]
async sendto(data, addr)[source]

Send message to address.

class qulab.rpc.ZMQRPCCallable(methodNane, owner)[source]

Bases: object

class qulab.rpc.ZMQServer(loop=None)[source]

Bases: qulab.rpc.RPCServerMixin

property executor
getRequestHandler(methodNane, **kw)[source]

Get suitable handler for request.

You should implement this method yourself.

property loop

Event loop.

property port
async run()[source]
async sendto(data, address)[source]

Send message to address.

set_module(mod)[source]
set_socket(sock)[source]
start()[source]
stop()[source]

qulab.serialize module

qulab.serialize.encode_excepion(e: Exception) → bytes[source]
qulab.serialize.pack(obj: Any) → bytes[source]

Serialize

qulab.serialize.packz(obj: Any) → bytes[source]

Serialize and compress.

qulab.serialize.register(cls: type, encode: Callable[[cls], bytes] = <built-in function dumps>, decode: Callable[[bytes], cls] = <built-in function loads>) → None[source]

Register a serializable type

Parameters
  • cls – type

  • encode – Callable translate an object of type cls into bytes default: pickle.dumps

  • decode – Callable translate bytes to an object of type cls default: pickle.loads

qulab.serialize.unpack(buff: bytes) → Any[source]

Unserialize

qulab.serialize.unpackz(buff: bytes) → Any[source]

Decompress and unserialize.

qulab.utils module

qulab.utils.IEEE_488_2_BinBlock(datalist, dtype='int16', is_big_endian=True)[source]

将一组数据打包成 IEEE 488.2 标准二进制块

Parameters
  • datalist – 要打包的数字列表

  • dtype – 数据类型

  • endian – 字节序

Returns

binblock, header 二进制块, 以及其 ‘header’

qulab.utils.ShutdownBlocker(*args, **kwds)
qulab.utils.acceptArg(f, name, keyword=True)[source]

Test if argument is acceptable by function.

Parameters
  • f – callable function

  • name – str argument name

qulab.utils.getHostIP()[source]

获取本机 ip 地址

qulab.utils.getHostIPv6()[source]

获取本机 ipv6 地址

qulab.utils.getHostMac()[source]

获取本机 mac 地址

qulab.utils.randomID()[source]

Generate a random msg ID.

qulab.utils.retry(exception_to_check, tries=4, delay=0.5, backoff=2, logger=None)[source]

Retry calling the decorated function using an exponential backoff. :param exception_to_check: the exception to check.

may be a tuple of exceptions to check

Parameters
  • tries (int) – number of times to try (not retry) before giving up

  • delay (float, int) – initial delay between retries in seconds

  • backoff (int) – backoff multiplier e.g. value of 2 will double the delay each retry

  • logger (logging.Logger) – logger to use. If None, print