Source code for qulab.utils

# -*- coding: utf-8 -*-
import contextlib
import functools
import inspect
import os
import socket
import struct
import time
import uuid
from functools import wraps
from hashlib import sha1

import numpy as np


[docs]def IEEE_488_2_BinBlock(datalist, dtype="int16", is_big_endian=True): """ 将一组数据打包成 IEEE 488.2 标准二进制块 Args: datalist : 要打包的数字列表 dtype : 数据类型 endian : 字节序 Returns: binblock, header 二进制块, 以及其 'header' """ types = {"b" : ( int, 'b'), "B" : ( int, 'B'), "h" : ( int, 'h'), "H" : ( int, 'H'), "i" : ( int, 'i'), "I" : ( int, 'I'), "q" : ( int, 'q'), "Q" : ( int, 'Q'), "f" : (float, 'f'), "d" : (float, 'd'), "int8" : ( int, 'b'), "uint8" : ( int, 'B'), "int16" : ( int, 'h'), "uint16" : ( int, 'H'), "int32" : ( int, 'i'), "uint32" : ( int, 'I'), "int64" : ( int, 'q'), "uint64" : ( int, 'Q'), "float" : (float, 'f'), "double" : (float, 'd'), "float32": (float, 'f'), "float64": (float, 'd') } # yapf: disable datalist = np.asarray(datalist) datalist.astype(types[dtype][0]) if is_big_endian: endianc = '>' else: endianc = '<' datablock = struct.pack( '%s%d%s' % (endianc, len(datalist), types[dtype][1]), *datalist) size = '%d' % len(datablock) header = '#%d%s' % (len(size), size) return header.encode() + datablock, header
[docs]@functools.lru_cache(maxsize=1) def getHostIP(): """ 获取本机 ip 地址 """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] except OSError: return '127.0.0.1' finally: s.close() return ip
[docs]@functools.lru_cache(maxsize=1) def getHostIPv6(): """ 获取本机 ipv6 地址 """ try: s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) s.connect(('2001:4860:4860::8888', 80, 0, 0)) ip = s.getsockname()[0] except OSError: return '::1' finally: s.close() return ip
[docs]@functools.lru_cache(maxsize=1) def getHostMac(): """ 获取本机 mac 地址 """ mac = uuid.UUID(int=uuid.getnode()).hex[-12:] return ":".join([mac[e:e + 2] for e in range(0, 11, 2)])
[docs]def randomID(): """ Generate a random msg ID. """ msgID = sha1(os.urandom(32)).digest() return msgID
[docs]def acceptArg(f, name, keyword=True): """ Test if argument is acceptable by function. Args: f: callable function name: str argument name """ sig = inspect.signature(f) for param in sig.parameters.values(): if param.name == name and param.kind != param.VAR_POSITIONAL: return True elif param.kind == param.VAR_KEYWORD: return True elif param.kind == param.VAR_POSITIONAL and not keyword: return True return False
[docs]def retry(exception_to_check, tries=4, delay=0.5, backoff=2, logger=None): """Retry calling the decorated function using an exponential backoff. Args: exception_to_check (Exception): the exception to check. may be a tuple of exceptions to check tries (int): number of times to try (not retry) before giving up delay (float, int): initial delay between retries in seconds backoff (int): backoff multiplier e.g. value of 2 will double the delay each retry logger (logging.Logger): logger to use. If None, print """ def deco_retry(func): @wraps(func) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay while mtries > 1: try: return func(*args, **kwargs) except exception_to_check as exc: msg = "%s, Retrying in %s seconds..." % (str(exc), mdelay) if logger: logger.warning(msg) time.sleep(mdelay) mtries -= 1 mdelay *= backoff return func(*args, **kwargs) return f_retry # true decorator return deco_retry
@contextlib.contextmanager def _WindowsShutdownBlocker(title='Python script'): """ Block Windows shutdown when you do something important. """ from ctypes import CFUNCTYPE, c_bool, c_uint, c_void_p, c_wchar_p, windll import win32con import win32gui def WndProc(hWnd, message, wParam, lParam): if message == win32con.WM_QUERYENDSESSION: return False else: return win32gui.DefWindowProc(hWnd, message, wParam, lParam) CALLBACK = CFUNCTYPE(c_bool, c_void_p, c_uint, c_void_p, c_void_p) wc = win32gui.WNDCLASS() wc.style = win32con.CS_GLOBALCLASS | win32con.CS_VREDRAW | win32con.CS_HREDRAW wc.lpfnWndProc = CALLBACK(WndProc) wc.hbrBackground = win32con.COLOR_WINDOW + 1 wc.lpszClassName = "block_shutdown_class" win32gui.RegisterClass(wc) hwnd = win32gui.CreateWindow(wc.lpszClassName, title, win32con.WS_OVERLAPPEDWINDOW, 50, 50, 100, 100, 0, 0, win32gui.GetForegroundWindow(), None) win32gui.ShowWindow(hwnd, win32con.SW_HIDE) windll.user32.ShutdownBlockReasonCreate.argtypes = [c_void_p, c_wchar_p] windll.user32.ShutdownBlockReasonCreate.restype = c_bool windll.user32.ShutdownBlockReasonCreate( hwnd, "Important work in processing, don't shutdown :-(") yield windll.user32.ShutdownBlockReasonDestroy.argtypes = [c_void_p] windll.user32.ShutdownBlockReasonDestroy.restype = c_bool windll.user32.ShutdownBlockReasonDestroy(hwnd) win32gui.DestroyWindow(hwnd) win32gui.UnregisterClass(wc.lpszClassName, None) @contextlib.contextmanager def _FakeShutdownBlocker(title='Python script'): yield if os.name == 'nt': ShutdownBlocker = _WindowsShutdownBlocker else: ShutdownBlocker = _FakeShutdownBlocker