pymprpc.client package

Module contents

mprpc的python客户端.

Python客户端只支持python3.6+,并提供同步接口SyncRPC和异步接口AsyncRPC

  • File: client.__init__.py

  • Version: 0.5

  • Author: hsz

  • Email: hsz1273327@gmail.com

  • Copyright: 2018-02-08 hsz

  • License: MIT

  • History

    • 2018-01-23 created by hsz
    • 2018-02-08 version-0.5 by hsz

异步接口使用方法

import asyncio
from pymprpc.client import AsyncRPC


async def main(loop):
    async with AsyncRPC(
            addr="tcp://admin:admin@127.0.0.1:5000",
            loop=loop,
            debug=True) as rpc:
        print(await rpc.system.listMethods())
        print(await rpc.system.methodSignature("testclassmethod"))
        print(await rpc.system.methodHelp("testfunc"))
        print(await rpc.system.lenConnections())
        print(await rpc.system.lenUndoneTasks())
        print(await rpc.testclassmethod(1, 2))
        print(await rpc.testclasscoro(2, 3))
        print(await rpc.testcoro(5, 6))
        print(await rpc.testfunc(5, 4))
        # await asyncio.sleep(200)
        print("wait done")
        print(await rpc.testfunc())
    print("end")
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
except Exception as e:
    raise e

同步接口使用方法

import time
from pymprpc.client import RPC

with RPC(addr="tcp://admin:admin@127.0.0.1:5000",
         debug=True) as rpc:
    print(rpc.system.listMethods())
    print(rpc.system.methodSignature("testclassmethod"))
    print(rpc.system.methodHelp("testfunc"))
    print(rpc.system.lenConnections())
    print(rpc.system.lenUndoneTasks())
    print(rpc.testclassmethod(1, 2))
    print(rpc.testclasscoro(2, 3))
    print(rpc.testcoro(5, 6))
    print(rpc.testfunc(5, 4))
    agen = rpc.testcorogen(1, 2)
    for i in agen:
        print(i)
    time.sleep(200)
    print("wait done")
    # rpc.close()
    print(rpc.testfunc())
class pymprpc.client.AsyncRPC(addr: str, loop: Union[asyncio.events.AbstractEventLoop, NoneType] = None, debug: bool = False, compreser: Union[str, NoneType] = None, heart_beat: Union[int, NoneType] = None)[source]

Bases: pymprpc.mixins.encoder_decoder_mixin.EncoderDecoderMixin

异步的RPC客户端.

可以通过设置debug=True规定传输使用json且显示中间过程中的信息.

SEPARATOR

bytes

  • 协议规定的请求响应终止符
VERSION

str

  • 协议版本,以`x.x`的形式表现版本
COMPRESERS

Dict[str,model]

  • 支持的压缩解压工具
username

Optional[str]

  • 登录远端的用户名
password

Optional[str]

  • 登录远端的密码
hostname

str

  • 远端的主机地址
port

int

  • 远端的主机端口
loop

asyncio.AbstractEventLoop

  • 使用的事件循环
debug

bool

  • 是否使用debug模式
compreser

Optional[str]

  • 是否使用压缩工具压缩传输信息,以及压缩工具是什么
heart_beat

Optional[int]

  • 是否使用心跳机制确保连接不会因过期而断开
closed

bool

  • 客户端是否已经关闭或者还未开始运转
reader

asyncio.StreamReader

  • 流读取对象
writer

asyncio.StreamWriter

  • 流写入对象
tasks

Dict[str,asyncio.Future]

  • 远端执行的任务,保存以ID为键
gens

Dict[str,Any]

  • 远端执行的流返回任务,保存以ID为键
gens_res

Dict[str,List[Any]]

  • 远端执行的流返回任务的结果,保存以ID为键
remote_info

Dict[str,Any]

  • 通过验证后返回的远端服务信息
COMPRESERS = {'bz2': <module 'bz2' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/bz2.py'>, 'lzma': <module 'lzma' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/lzma.py'>, 'zlib': <module 'zlib' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/lib-dynload/zlib.cpython-36m-darwin.so'>}
SEPARATOR = b'##PRO-END##'
VERSION = '0.1'
async_query(methodname, *args, **kwargs)[source]

异步调用一个远端的函数.

为调用创建一个ID,并将调用请求的方法名,参数包装为请求数据后编码为字节串发送出去.并创建一个Future对象占位.

Parameters:
  • methodname (str) –
    • 要调用的方法名
  • args (Any) –
    • 要调用的方法的位置参数
  • kwargs (Any) –
    • 要调用的方法的关键字参数
Returns:

  • 返回对应ID的Future对象

Return type:

(asyncio.Future)

clean()[source]

清理还在执行或者等待执行的协程.

close()[source]

关闭与远端的连接.

判断标志位closed是否为False,如果是则关闭,否则不进行操作

connect()[source]

与远端建立连接.

主要执行的操作有: + 将监听响应的协程_response_handler放入事件循环 + 如果有验证信息则发送验证信息 + 获取连接建立的返回

delay(methodname, *args, **kwargs)[source]

调用但不要求返回结果,而是通过系统方法getresult来获取.

Parameters:
  • methodname (str) –
    • 要调用的方法名
  • args (Any) –
    • 要调用的方法的位置参数
  • kwargs (Any) –
    • 要调用的方法的关键字参数
reconnect()[source]

断线重连.

send_query(ID, methodname, returnable, *args, **kwargs)[source]

将调用请求的ID,方法名,参数包装为请求数据后编码为字节串发送出去.

Parameters:
  • ID (str) –
    • 任务ID
  • methodname (str) –
    • 要调用的方法名
  • returnable (bool) –
    • 是否要求返回结果
  • args (Any) –
    • 要调用的方法的位置参数
  • kwargs (Any) –
    • 要调用的方法的关键字参数
Returns:

  • 准确地说没有错误就会返回True

Return type:

(bool)

class pymprpc.client.RPC(addr: str, debug: bool = False, compreser: Union[str, NoneType] = None)[source]

Bases: pymprpc.mixins.encoder_decoder_mixin.EncoderDecoderMixin

同步的RPC客户端.注意,是单线程实现,不支持心跳操作,也没有连接过期的检验.

可以通过设置debug=True规定传输使用json且显示中间过程中的信息.

SEPARATOR

bytes

  • 协议规定的请求响应终止符
VERSION

str

  • 协议版本,以`x.x`的形式表现版本
COMPRESERS

Dict[str,model]

  • 支持的压缩解压工具
username

Optional[str]

  • 登录远端的用户名
password

Optional[str]

  • 登录远端的密码
hostname

str

  • 远端的主机地址
port

int

  • 远端的主机端口
debug

bool

  • 是否使用debug模式
compreser

Optional[str]

  • 是否使用压缩工具压缩传输信息,以及压缩工具是什么
closed

bool

  • 客户端是否已经关闭或者还未开始运转
reader

asyncio.StreamReader

  • 流读取对象
writer

asyncio.StreamWriter

  • 流写入对象
tasks

Dict[str,asyncio.Future]

  • 远端执行的任务,保存以ID为键
gens

Dict[str,Any]

  • 远端执行的流返回任务,保存以ID为键
gens_res

Dict[str,List[Any]]

  • 远端执行的流返回任务的结果,保存以ID为键
remote_info

Dict[str,Any]

  • 通过验证后返回的远端服务信息
COMPRESERS = {'bz2': <module 'bz2' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/bz2.py'>, 'lzma': <module 'lzma' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/lzma.py'>, 'zlib': <module 'zlib' from '/Users/huangsizhe/LIB/CONDA/anaconda/lib/python3.6/lib-dynload/zlib.cpython-36m-darwin.so'>}
SEPARATOR = b'##PRO-END##'
VERSION = '0.1'
close()[source]

关闭与远端的连接.

判断标志位closed是否为False,如果是则关闭,否则不进行操作

connect()[source]

与远端建立连接,并进行验证身份.

query(methodname, *args, **kwargs)[source]

异步调用一个远端的函数.

为调用创建一个ID,并将调用请求的方法名,参数包装为请求数据后编码为字节串发送出去.并创建一个Future对象占位.

Parameters:
  • methodname (str) –
    • 要调用的方法名
  • args (Any) –
    • 要调用的方法的位置参数
  • kwargs (Any) –
    • 要调用的方法的关键字参数
Returns:

  • 返回对应ID的Future对象

Return type:

(asyncio.Future)

reconnect()[source]

断线重连.

send_query(ID, methodname, *args, **kwargs)[source]

将调用请求的ID,方法名,参数包装为请求数据后编码为字节串发送出去.

Parameters:
  • ID (str) –
    • 任务ID
  • methodname (str) –
    • 要调用的方法名
  • args (Any) –
    • 要调用的方法的位置参数
  • kwargs (Any) –
    • 要调用的方法的关键字参数
Returns:

  • 准确地说没有错误就会返回True

Return type:

(bool)