Welcome to pymprpc’s documentation!

这是Message-pack Rpc的一个python实现,

安装

pip install pymprpc

使用方式

这个rpc的实现参考了标准库xmlrpc,通过特定的注册函数,注册装饰器实现对函数的注册. 客户端和服务端都可以通过上下文管理器或者异步上下文管理连接.

服务器端

要注册函数需要先有一个服务器实例,我们通常使用上下文管理器来创建服务器实例, 并在上下文中让服务器执行,以便在关闭服务器时可以比较简单的回收资源.

其中:

  • register_introspection_functions 用于注册默认定义的自省函数
  • register_function 装饰器,用于装饰函数,协程函数,或者异步生成器函数,将他们注册到可以调用的位置.
  • register_instance 用于注册某个类的实例,会将其中的公开函数都注册到可调用位置.

要启动服务器,则可以执行`run_forever`方法.当要退出的时候,上下文管理器会自己关闭服务并做清理工作.

with MyMPRPCServer(("127.0.0.1", 5000), debug=True) as server:
    server.register_introspection_functions()

    @server.register_function()
    def testfunc(a, b):
        """有help
        """
        return a + b

    @server.register_function()
    async def testcoro(a, b):
        await asyncio.sleep(0.1)
        return a + b

    @server.register_function()
    async def testcorogen(a, b):
        for i in range(10):
            await asyncio.sleep(0.1)
            yield i + a + b

    class TestClass:

        def testclassmethod(self, a, b):
            return a + b

        async def testclasscoro(self, a, b):
            await asyncio.sleep(0.1)
            return a + b

        async def testclasscorogen(self, a, b):
            for i in range(10):
                await asyncio.sleep(0.1)
                yield i + a + b

    t = TestClass()
    server.register_instance(t)
    server.run_forever()

需要注意的是:

  • 返回流的情况只支持异步生成器
  • 注册的函数默认使用多进程执行器执行,以防止cpu密集型任务阻塞事件循环.
  • 服务并没有提供多进程实现,这主要是因为协程对io的利用率已经很高,对于RPC来说已经很足够.如果返回延迟高应该是计算密集型任务占用事件过长.

客户端

客户端有两个实现,一个是使用协程接口的异步实现`AsyncRpc`,一个是同步实现.需要注意的是同步实现没有使用多线程等技术,因此无法使用心跳,也无法监控到连接过期. 客户端调用直接使用`.`符号即可.异步客户端需要`await`一下其他都一样.

当然客户端也支持使用`connect`,`close`接口手动控制连接,也提供了`reconnect`这样的接口用于重连,比如用在一些web框架中时,手动操作可能是个更好的选择. 相比较而言,更加推荐使用异步接口,因为它功能更加全面.

同步

同步接口只要直接调用即可,如果返回的是流,那么获得的结果将会是一个生成器,可以通过遍历来获取全部值.

with RPC(addr="tcp://admin:admin@127.0.0.1:5000",
         debug=True) as rpc:
    print(rpc.system.listMethods())
    print(rpc.system.methodSignature("testclassmethod"))
    print(rpc.system.methodHelp("testfunc"))
    print(rpc.system.lenConnections())
    print(rpc.system.lenUndoneTasks())
    print(rpc.testclassmethod(1, 2))
    print(rpc.testclasscoro(2, 3))
    print(rpc.testcoro(5, 6))
    print(rpc.testfunc(5, 4))
    agen = rpc.testcorogen(1, 2)
    for i in agen:
        print(i)

异步

异步接口有一个需要注意的点,就是如果返回的是流,那么会被包装成一个异步生成器, 需要使用`async for`语句来调用.另外异步接口允许不返回,这样一些不要返回值的方法比如邮件发送什么的就可以直接这样调用以提高效率.

async with AsyncRPC(
        addr="tcp://admin:admin@127.0.0.1:5000",
        loop=loop,
        debug=True,
        heart_beat=40) as rpc:
    print(await rpc.system.listMethods())
    print(await rpc.system.methodSignature("testclassmethod"))
    print(await rpc.system.methodHelp("testfunc"))
    print(await rpc.system.lenConnections())
    print(await rpc.system.lenUndoneTasks())
    print(await rpc.testclassmethod(1, 2))
    print(await rpc.testclasscoro(2, 3))
    print(await rpc.testcoro(5, 6))
    print(await rpc.testfunc(5, 4))
    agen = await rpc.testcorogen(1, 2)
    async for i in agen:
        print(i)

TODO

  • 负载均衡中间件

Indices and tables