最不喜欢在Tornado中使用任何同步阻塞型的东西,不想让ioloop阻塞在某个IO调用上,因为单线程的东西任何阻塞都是代价很高的,除非你的数据库被优化的性能很好,速度很快。除了之前的线程池之外,直接使用异步库也是不错的选择,Motor就是Tornado里可以用的很好的异步库,它兼容Tornado的gen.coroutine式的异步调用形式,主要使用了greenlet来巧妙的封装PyMongo的同步API, 把底层的socketIO进行了异步化的处理,化同步为异步。

从使用的例子来分析Motor是如何把PyMongo的API异步化的:

client = motor.MotorClient(...)
db = client['testDB']
...
class SomeHandler(web.RequestHandler):
    @gen.coroutine
    def get(self):
        ...
        doc = yield db.test_collection.find_one({'i': {'$lt': 1}})
        ...

以上只是简单的示例使用Motor的大概流程,当我们通过MotorClient生成client的时候,Motor将自己实现的MotorPool作为pool_class传给PyMongo的MongoClient的构造函数,这样连接池就被替换成了Motor自己的连接池,而MotorPool在通信过程中则使用了Motor自己封装的异步socket。

Motor除了在底层上替换原有的通信过程,另外对PyMongo的API做了异步化的处理,使之兼容Tornado的异步调用过程。和PyMongo使用的不同就是像代码里展示的那样,类似Tornado里其他的异步调用一样,通过yield出一个Future来暂时挂起上下文,当Future被set_result的时候,在ioloop上注册callback来恢复上下文。也就是说:doc = yield db.test_collection.find_one({'i': {'$lt': 1}})这句并不会阻塞ioloop,而是暂时挂起了。Motor做到这点主要是利用了greenlet来封装原始的PyMongo的同步API。

看一下类似find_one这样的PyMongo的操作API(其余还有findinsert等等)是怎样被异步化的:

def asynchronize(motor_class, framework, sync_method, has_write_concern, doc=None):
    @functools.wraps(sync_method)
    def method(self, *args, **kwargs):
        check_deprecated_kwargs(kwargs)
        loop = self.get_io_loop() #比如: Tornado的IOLoop
        callback = kwargs.pop('callback', None)

        if callback:
            if not callable(callback):
                raise callback_type_error
            future = None
        else:
            #在Tornado的gen.coroutine中yield出来
            future = framework.get_future(self.get_io_loop())

        #call_method是实际的对PyMongo的sync_method的封装,
        #这个方法的调用将在一个子greenlet中进行
        def call_method():
            try:
                #sync_method 是实际的同步的PyMongo的同步调用
                #也许会疑惑:即使在子greenlent中运行,不依然会阻塞吗?
                #稍后会解决这个疑惑
                result = sync_method(self.delegate, *args, **kwargs)
                #给yield出去的future赋予结果,可以恢复外部的上下文(gen.coroutine)
                if callback:
                    # Schedule callback(result, None) on main greenlet.
                    framework.call_soon(
                        loop,
                        functools.partial(callback, result, None))
                else:
                    # Schedule future to be resolved on main greenlet.
                    framework.call_soon(
                        loop,
                        functools.partial(future.set_result, result))
            except Exception as e:
                if callback:
                    framework.call_soon(
                        loop,
                        functools.partial(callback, None, e))
                else:
                    # TODO: we lost Tornado's set_exc_info. Frameworkify this.
                    framework.call_soon(
                        loop,
                        functools.partial(future.set_exception, e))

        #在一个子greenlet中运行这个同步的方法封装
        #注意: 这个greenlet是我们在当前方法中生成的,也就是说我们当前的方法所处的
        #greenlet是我们生成的greenlet的父greenlet,这个在后续的上下文切换中非常重要
        greenlet.greenlet(call_method).switch() #切换到子greenlet执行
        #返回future
        return future
    ...
    return method

以上代码会封装PyMongo的同步操作,使之可以在Tornado的gen.coutoine中yield出来,参数sync_method就是原始的PyMongo的同步调用。通过注释理解这种封装的方法,原理其实就是:把同步调用放到一个子greenlet中去执行,当子greenlet阻塞时,切换上下文回父greenlet执行,也就是yield出future,这样就不会阻塞主ioloop了。最关键的问题开始:1)什么时候切换回父greenlet? 2)当数据到达,阻塞解除时,再怎么回到子greenlet继续执行?

我们说过,Motor用自己的MotorPool取代了PyMongo的连接池,回看下之前代码,比如我们调用find_one,我们会在子greenlet中执行,而find_one里则会走PyMongo的同步的那一套逻辑,但是当代码走到最底层的实际的socket的IO时,就走Motor的socket的IO了,因为连接池已经被Motor移花接木,换成了自己的那一套。

当sync_method里的同步逻辑走到socket的IO时,原有的socket.recv被Motor用异步的recv代替了:

@tornado_motor_sock_method
def recv(self, num_bytes):
    #借助了Tornado的IOStream的异步读写封装
    #IOStream的异步读写不会阻塞,而是一种事件驱动的异步非阻塞IO
    future = stream_method(self.stream, 'read_bytes', num_bytes)
    try:
        if self.timeout_td:
            result = yield _Wait(future, self.io_loop, self.timeout_td, timeout_exc)
        else:
            result = yield future
    except IOError as e:
        raise socket.error(str(e))
    raise gen.Return(result)

从以上针对socket的recv方法看出,通过Tornado的异步读写封装(IOStream),实现了异步的socket,目前关键的就是tornado_motor_sock_method了,很明显,这个装饰器将完成greenlet上下文的父子切换,解决我们之前在sync_method调用时的疑惑。

看一下tornado_motor_sock_method这个装饰器的实现:

def tornado_motor_sock_method(method):
    coro = gen.coroutine(method)

    @functools.wraps(method)
    def wrapped(self, *args, **kwargs):
        #当前greenlet是一个子greenlet
        child_gr = greenlet.getcurrent()
        #获取当前greenlet的父greenlet,即之前代码提到过的asynchronize所在的greenlet
        main = child_gr.parent

        def callback(future):
            if future.exc_info():
                child_gr.throw(*future.exc_info())
            elif future.exception():
                child_gr.throw(future.exception())
            else:
                #当future的结果到达,切换回挂起的子greenlet
                child_gr.switch(future.result())

        #保证callback在当前greenlet的父greenlet中运行
        self.io_loop.add_future(coro(self, *args, **kwargs), callback)
        #return这句会暂时挂起当前greenlet,将控制权切换回父greenlet,
        #在上面的callback执行时,才会切换回当前greenlet,return语句返回
        return main.switch()
    return wrapped

find_one调用为例,find_one的一系列调用都在一个greenlet中进行,比如最终走到了resut = socket.recv(...)这句,通过以上的代码可以发现,这句会暂时被挂起这个greenlet,并把控制权暂时切换回当前greenlet的父greenlet,也就是asynchronize方法所在的主greenlet,而回看之前的asynchronize的代码,发现切换回去之后,asynchronize调用立即返回future,也就是doc = yield db.test_collection.find_one(...)会yield出这个future,这样,ioloop没有被阻塞。而当socket上数据到达时,我们会通过在这句self.io_loop.add_future(coro(self, *args, **kwargs), callback)里添加的callback切换回挂起的子greenlet,也就是return main.switch()这句会返回,result = socket.recv(...)这句就恢复执行,这样,刚才挂起的greenlet就继续往下执行了,最终执行到asynchronize里的call_method里的result = sync_method(self.delegate, *args, **kwargs)这句返回,再回顾之前的asynchronize的逻辑,这句返回后,通过framework.call_soon(loop, functools.partial(future.set_result, result))这句,yield出去的future被set_result,这样,doc = yield db.test_collection.find_one(...)挂起的上下文稍后也会恢复执行了。

以上就是Motor利用Tornado的ioloop和iostream以及greenlet来封装PyMongo并将其异步化的过程。

另附, Motor作者在自己博客上的介绍文章:
Introducing Motor, an asynchronous MongoDB driver for Python and Tornado
Motor Internals: How I Asynchronized a Synchronous Library

文章目录