twisted系列教程十九–cancel deferred

twisted系列教程十九–cancel deferred

首页战争策略代号Midnight更新时间:2024-08-01

原文:http://krondo.com/blog/?p=1209

Chinese by Yang Xiaowei and Cheng Luo

Introduction

twisted 是一个正在发展的项目,twisted 的开发者们会添加一些新的特色或者扩展旧的.随着twisted 10.1.0 的发布,开发者们增加了一个新的功能–取消,这个就是我们今天要讲的内容.

异步的程序把request 和response 进行了解耦,于是增加了一个新的可能:在发送一个请求和得到返回结果之间你可以决定是否你还要返回结果.细想一下第十四部分的poetry proxy server.下面是这个代理怎么工作的(第一次没有缓存的时候):

一个请求进来

这个proxy 连接真正的server去获取诗

一但获取到完整的诗,发送给client

一切工作的很好,但是假如client在得到诗之前挂起了怎么办?这样的话我们的proxy 就会被卡住.在出现这种情况的时候我们最好的办法就是关掉连接.

回想一下图片十五,一个描述了同步程序中概念上的流程控制的图表.在这个图片中我们可以看到函数调用往下走,异常往上走.假如我们想取消一个同步的调用,程序的流程会和函数调用的方向是一样的,从高层代码到底层代码.图片三十八描述了这个过程:

图片三十八

当然,在一个同步的程序中这个是不可能的因为高层的代码不会恢复运行直到底层的代码运行完,在这时就没有什么可以取消的了.但是在一个异步的程序中,高层的代码在底层的代码运行完之前控制着程序,这就让我们有能力取消底层次的请求.

在一个twisted 程序中,底层的请求是被deferred对象所体现的.在deferred 中信息流是向下的,从底层的代码到高层的代码,这个和同步程序中return 信息流是相同的.从twsited 10.1.0 开始,高层的代码可以向底层的代码传送信息–它可以告诉底层的代码它不再想要返回的结果了.看图片三十九:

图片三十九

Canceling Deferreds

先让我们看一些例子来搞明白取消deferred 是怎样工作的.记住,运行这一部分的代码你需要将twisted 升级到10.1.0 .首先deferred-cancel/defer-cancel-1.py:

from twisted.internet import defer

def callback(res):

print 'callback got:', res

d = defer.Deferred()

d.addCallback(callback)

d.cancel()

print 'done'

利用这个取消的特性,deferred 类增加了一个cancel 的方法.这个例子程序创建一个deferred,增加一个callback,然后在触发这个deferred之前取消它.下面是输出:

done

Unhandled error in Deferred:

Traceback (most recent call last):

Failure: twisted.internet.defer.CancelledError:

ok,取消一个deferred 触发了errback,我们正常的callback 没有被调用.并且出现的错误是twisted.internet.defer.CancelledError.让我们在deferred-cancel/defer-cancel-2.py增加一个errback:

from twisted.internet import defer

def callback(res):

print 'callback got:', res

def errback(err):

print 'errback got:', err

d = defer.Deferred()

d.addCallbacks(callback, errback)

d.cancel()

print 'done'

我们得到如下的输出:

errback got: [Failure instance: Traceback (failure with no frames): :

]

done

我们可以捕捉到这个CancelledError 就像捕捉其他的deferred 的错误一样.

下面让我们先触发deferred 然后再取消它,代码在deferred-cancel/defer-cancel-3.py:

from twisted.internet import defer

def callback(res):

print 'callback got:', res

def errback(err):

print 'errback got:', err

d = defer.Deferred()

d.addCallbacks(callback, errback)

d.callback('result')

d.cancel()

print 'done'

下面是输出:

callback got: result

done

我们的callback 被正常的触发了,然后我们的程序正常的结束.就好像cancel 没有被调用一样.看起来取消一个cancel 对一个已经触发的deferred 没有什么作用一样.(继续往下看).

如果我们取消deferred 之后再去触发deferred 会发生什么呢? 例子在deferred-cancel/defer-cancel-4.py

from twisted.internet import defer

def callback(res):

print 'callback got:', res

def errback(err):

print 'errback got:', err

d = defer.Deferred()

d.addCallbacks(callback, errback)

d.cancel()

d.callback('result')

print 'done'

在这种情况下我们得到如下的输出:

errback got: [Failure instance: Traceback (failure with no frames): :

]

done

很有意思,输出和第二个例子中的一样,就像根本没有触发deferred一样.所以假如deferred 已经被取消了,再去触发这个deferred 会没有什么用.但是为什么d.callback(‘result’) 会抛出一个异常,因为你不能触发一个已经被触发过的deferred?

再看一下图片三十九.触发一个deferred 是底层代码的任务,而取消一个deferred是高层代码的的动作.触发这个deferred 意味着”这里是你的结果”,而取消一个deferred 意味着”我不在需要它”.

cancel 方法主要做了两件事情:

告诉deferred 对象如果这个结果还没有来的话,我们就不要了.并忽略后面的callback或者errback

告诉底层的正在产生结果的代码去做一些取消操作需要做的事情

我们取消deferred 的行为是自由的,在取消之后,我们就不会得到那些还没有返回的结果(即使返回了我们也收不到).但是取消deferred 并不会最终取消异步的操作.取消一个异步的操作需要一系列的操作.你可能需要关闭一个连接,回滚一些事物,*掉一个自进程,等等. 因为deferred 只是一个callback 的组织者,它怎么知道当你取消它的时候会采取什么操作? 或者,它怎样把取消请求转发给底层的代码? 大声跟着我喊:

用Callback

Canceling Deferreds, Really

好的,让我们看一下deferred-cancel/defer-cancel-5.py:

from twisted.internet import defer

def canceller(d):

print "I need to cancel this deferred:", d

def callback(res):

print 'callback got:', res

def errback(err):

print 'errback got:', err

d = defer.Deferred(canceller)

# created by lower-level code

d.addCallbacks(callback, errback)

# added by higher-level code

d.cancel()

print 'done'

这段代码很像第二个例子,除了一个多了一个canceller callback,这个callback 是deferred创建的时候传入的,不是后面加入的.这个callback 负责执行要取消这个deferred 所要进行的一系列操作(仅仅这个deferred 被取消的时候).这个canceller callback 是底层代码返回deferred 不可缺少的一部分,不是用来接收deferred并添加callback 和errback 的高层代码.

运行这个代码,你会看到如下的输出:

I need to cancel this deferred:

errback got: [Failure instance: Traceback (failure with no frames): :

]

done

你可以看到,这个canceller callback被传入一个我们想取消的deferred.在canceller callback 中我们可以进行一些取消deferred要完成的一些附属操作.canceller callback 是在errback之前触发的.实际上,这时我们可以选择用正常的结果还是错误的结果触发deferred.所有的情况在 deferred-cancel/defer-cancel-6.py 和deferred-cancel/defer-cancel-7.py.

在把reactor引入进来之前,让我们再做一个简单的测试.我们创建了一个带有canceller 的deferred,正常的触发,并取消它,你可以在deferred-cancel/defer-cancel-8.py看到代码.通过检查这个例子的输出,你可以看到在取消一个已经触发的deferred之后并没有触发canceller callback.

我们上面看的例子都没有做些实际的异步操作.下面让我们创建一个简单的触发异步操作的程序,然后我们会弄明白怎样让这个异步操作变为可以取消的.看一下代码 deferred-cancel/defer-cancel-9.py:

from twisted.internet.defer import Deferred

def send_poem(d):

print 'Sending poem'

d.callback('Once upon a midnight dreary')

def get_poem():

"""Return a poem 5 seconds later."""

from twisted.internet import reactor

d = Deferred()

reactor.callLater(5, send_poem, d)

return d

def got_poem(poem):

print 'I got a poem:', poem

def poem_error(err):

print 'get_poem failed:', err

def main():

from twisted.internet import reactor

reactor.callLater(10, reactor.stop)

# stop the reactor in 10 seconds

d = get_poem()

d.addCallbacks(got_poem, poem_error)

reactor.run()

main()

这个例子包含了一个get_poem 函数,get_poem利用callLater 方法五秒钟之后返回一首诗.main 函数调用get_poem,并添加了一个callback/errback 对,并启动reactor.

运行这个例子会有如下的输出:

Sending poem

I got a poem: Once upon a midnight dreary

让我们试着在这首诗返回之前取消deferred.我们加上一些代码在两秒钟之后取消deferred.:

reactor.callLater(2, d.cancel) # cancel after 2 seconds

完整的程序在deferred-cancel/defer-cancel-10.py,会输出如下的内容:

get_poem failed: [Failure instance: Traceback (failure with no frames): :

]

Sending poem

这个例子清晰的描述了取消一个deferred并不一定会取消底层的异步请求.在两秒之后我们从errback看到了输出了CancelledError,在五秒之后我们仍旧会看到send_poem 的输出(但是接下来的callback没有触发).

这和例子四deferred-cancel/defer-cancel-4.py.取消deferred 导致最后的结果被忽略,但是真正意义上说并没有完全终止操作.根据我们上面讲的,要想彻底取消一个deferred你必须在deferred 创建的时候向其传入一个cancel callback.

这个被传入的callback需要做些什么呢?看一下callLater 的手册.callLater 返回的值是一个实现了IDelayedCall,并带有一个cancel 方法的对象,这个cancel 方法可以用来组织延迟的call 被执行.

这样就比较简单了,更新后的代码在deferred-cancel/defer-cancel-11.py,主要的变化在get_poem 函数:

def get_poem():

"""Return a poem 5 seconds later."""

def canceler(d):

# They don't want the poem anymore,

# so cancel the delayed call

delayed_call.cancel()

# At this point we have three choices:

# 1. Do nothing, and the deferred will fire the errback

# chain with CancelledError.

# 2. Fire the errback chain with a different error.

# 3. Fire the callback chain with an alternative result.

d = Deferred(canceler)

from twisted.internet import reactor

delayed_call = reactor.callLater(5, send_poem, d)

return d

在这个新的版本中,我们保存了从callLater 返回的值从而让我们可以在callback中用到.我们的callback唯一需要做的是触发delayed_call.cancel().根据我们上面讲的,我们也可以选择我们自己去触发这个callback.最新版的例子的输出为:

get_poem failed: [Failure instance: Traceback (failure with no frames): :

]

你可以看到,deferred 被取消了,异步的操作确实被抛弃了.

Poetry Proxy 3.0

根据我们所讲的,poetry proxy server 是一个应用cancel deferred 的很好的场景,因为它可以允许我们放弃诗的下载假如没有人想要诗的话. proxy Version 3.0 代码在 twisted-server-4/poetry-proxy.py,实现了deferred 的取消.第一个变化的在 PoetryProxyProtocol:

class PoetryProxyProtocol(Protocol):

def connectionMade(self):

self.deferred = self.factory.service.get_poem()

self.deferred.addCallback(self.transport.write)

self.deferred.addBoth(lambda r: self.transport.loseConnection())

def connectionLost(self, reason):

if self.deferred is not None:

deferred, self.deferred = self.deferred, None

deferred.cancel() # cancel the deferred if it hasn't fired

和老一版本的相比.主要的变化是:

保存我们从get_poem 得到的deferred,以便我们可以以后用于取消deferred

当连接关闭的时候取消deferred.需要注意的是在我们实际上获取诗之后也会取消deferred,但是取消一个已经触发的deferred不会有什么影响

现在我们需要确认的是取消deferred 之后确实会放弃下载诗.为了这个我们也需要改变一下 ProxyService:

class ProxyService(object):

poem = None # the cached poem

def __init__(self, host, port):

self.host = host

self.port = port

def get_poem(self):

if self.poem is not None:

print 'Using cached poem.'

# return an already-fired deferred

return succeed(self.poem)

def canceler(d):

print 'Canceling poem download.'

factory.deferred = None

connector.disconnect()

print 'Fetching poem from server.'

deferred = Deferred(canceler)

deferred.addCallback(self.set_poem)

factory = PoetryClientFactory(deferred)

from twisted.internet import reactor

connector = reactor.connectTCP(self.host, self.port, factory)

return factory.deferred

def set_poem(self, poem):

self.poem = poem

return poem

和老版本的ProxyService,有如下变化:

我们保存了reactor.connectTCP 的返回值,它是一个IConnector 对象,我们可以用它提供的disconnect方法来关闭连接

我们创建了一个带有canceler callback 的deferred.这个callback 用connector 来关闭连接.但首先要先设置factory.deferred 属性为None.否则的话,这个factory可能会在这个deferred 被CancelledError触发之前触发这个deferred(发生连接错误的时候)

你可能已经注意到我们现在在ProxyService 创建这个deferred,而不是在PoetryClientFactory中.因为我们的canceler callback 需要操作IConnector 对象,ProxyService 就成了创建deferred 最方便的额地方.

在我们之前的的例子中,我们的canceler callback 都是作为一个闭包来实现的.闭包是非常有用的当我们要取消callback的时候.

让我们试一下我们的proxy.首先开启一个slow server.它需要足够慢让我们有时间来取消:

python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txt

现在我们开启proxy:

python twisted-server-4/poetry-proxy.py --port 10000 10001

现在我们可以从proxy 下载诗了,可以用curl:

curl localhost:10000

几秒之后,按下Ctrl-C停止client,在运行proxy 的终端中你可以看到如下的输出:

Fetching poem from server.

Canceling poem download.

你可以看到我们的server 已经停止输出了,因为我们的proxy已经挂起了.

One More Wrinkle

我们上面已经说过了取消一个已经触发过的deferred 会没有作用.这并不是十分准确.在第十三部分我们了解到被添加到deferred 上的callback 和errback 会返回deferred.在这种情况下,外部的deferred 暂停执行它的cllback 链并等待内部的deferred 触发.

因此,即使一个deferred 触发了发起异步请求的高层的代码也不会接收到结果.因为这个callback 正在等待内部的deferred 实行完毕.如果高层的代码取消了外部的deferred会发生什么? 在这种情况下外部的deffered 不会取消自己,相反的它会取消内部的deferred.

所以当你取消一个deferred 的时候,你不是在取消主要的异步操作,而是在取消其他的被主要的异步操作触发的异步操作.(比较难懂哈 ,建议看英文).

我们可以用一个例子来描述这个过程.代码在deferred-cancel/defer-cancel-12.py:

from twisted.internet import defer

def cancel_outer(d):

print "outer cancel callback."

def cancel_inner(d):

print "inner cancel callback."

def first_outer_callback(res):

print 'first outer callback, returning inner deferred'

return inner_d

def second_outer_callback(res):

print 'second outer callback got:', res

def outer_errback(err):

print 'outer errback got:', err

outer_d = defer.Deferred(cancel_outer)

inner_d = defer.Deferred(cancel_inner)

outer_d.addCallback(first_outer_callback)

outer_d.addCallbacks(second_outer_callback, outer_errback)

outer_d.callback('result')

# at this point the outer deferred has fired, but is paused

# on the inner deferred.

print 'canceling outer deferred.'

outer_d.cancel()

print 'done'

在这个例子中我们创造了两个deferred,一个内部的一个外部的.首先我们触发外部的deferred,然后取消它.这个例子会有如下输出:

first outer callback, returning inner deferred

canceling outer deferred.

inner cancel callback.

outer errback got: [Failure instance: Traceback (failure with no frames): :

]

done

你可以看到,取消外部的deferred 不会引起外部的cancel callback 触发.相反的,它取消内部的deferred,所以内部deferred 的cancel callback会触发,然后外部的errback 会接收到一个CancelledError.

Discussion

取消一个deferred 是一个非常有用的操作,允许我们的程序去避免无谓的工作.但是也有一些取巧.

一个非常重要的要记住的事实是取消一个异步操作并不会一定取消根本的异步操作.实际上,大多数deferred 并不会真正的取消,因为大多数的twisted 代码还没有更新.检查文档或者源代码去看看是否取消deferred 会不会真正取消请求,或者只是忽略它.

第二个重要的事实是仅仅从异步的api 中返回一个deferred 并不会真正的取消deferred.假如你想在你的代码中实现取消deferred,你需要在在源代码中找更多的例子.

,
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved