Скрученный Python: итераторы и выходы /inlineCallbacks
Ребята, я полностью сбит с толку, так что, возможно, я даже не спрашиваю правильно, но здесь идет речь:
У меня есть искаженное приложение, использующее inlineCallbacks. Теперь мне нужно определить итератор, который будет означать, что генератор возвращается вызывающей стороне. Тем не менее, итератор не может быть украшен inlineCallbacks, не так ли? Если нет, то как я могу кодировать что-то вроде этого.
Просто чтобы уточнить: цель состоит в том, что process_loop нужно вызывать каждые, скажем, 5 секунд, он может обработать только ОДИН кусок, скажем, 10, а затем он должен отпустить. Тем не менее, чтобы знать, что кусок 10 (хранится в кэшированном, что является диктатом), необходимо вызвать функцию, которая возвращает отложенное значение.
@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield (call func here which returns deferred)
if result is True:
for k,v in cachedvalue.items():
yield cachename, k, v
@inlineCallbacks
def process_chunk(myiter, num):
try:
for i in xrange(num):
nextval = myiter.next()
yield some_processing(nextval)
returnValue(False)
except StopIteration:
returnValue(True)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
result = yield process_chunk(myiter, 10)
if not result:
print 'More left'
reactor.callLater(5, process_loop, cached)
else:
print 'All done'
3 ответа
Вы правы, что не можете выразить то, что хотите выразить в cacheiter
, inlineCallbacks
Декоратор не позволит вам иметь функцию, которая возвращает итератор. Если вы украсите функцию с ним, то результатом будет функция, которая всегда возвращает Deferred
, Вот для чего это.
Частично это усложняет то, что итераторы плохо работают с асинхронным кодом. Если в создании элементов вашего итератора участвует Deferred, то элементы, которые выходят из вашего итератора, сначала будут Deferreds.
Вы могли бы сделать что-то вроде этого, чтобы объяснить это:
@inlineCallbacks
def process_work():
for element_deferred in some_jobs:
element = yield element_deferred
work_on(element)
Это может сработать, но выглядит особенно странно. Поскольку генераторы могут уступать только своему вызывающему (а не, например, вызывающему), some_jobs
итератор ничего не может с этим поделать; только код лексически внутри process_work
может привести к отсрочке на inlineCallbacks
Батут, чтобы ждать.
Если вы не возражаете против этого шаблона, то мы можем представить, что ваш код написан примерно так:
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor
class cacheiter(object):
def __init__(self, cached):
self._cached = iter(cached.items())
self._remaining = []
def __iter__(self):
return self
@inlineCallbacks
def next(self):
# First re-fill the list of synchronously-producable values if it is empty
if not self._remaining:
for name, value in self._cached:
# Wait on this Deferred to determine if this cache item should be included
if (yield check_condition(name, value)):
# If so, put all of its values into the value cache so the next one
# can be returned immediately next time this method is called.
self._remaining.extend([(name, k, v) for (k, v) in value.items()])
# Now actually give out a value, if there is one.
if self._remaining:
returnValue(self._remaining.pop())
# Otherwise the entire cache has been visited and the iterator is complete.
# Sadly we cannot signal completion with StopIteration, because the iterator
# protocol isn't going to add an errback to this Deferred and check for
# StopIteration. So signal completion with a simple None value.
returnValue(None)
@inlineCallbacks
def process_chunk(myiter, num):
for i in xrange(num):
nextval = yield myiter.next()
if nextval is None:
# The iterator signaled completion via the special None value.
# Processing is complete.
returnValue(True)
# Otherwise process the value.
yield some_processing(nextval)
# Indicate there is more processing to be done.
returnValue(False)
def sleep(sec):
# Simple helper to delay asynchronously for some number of seconds.
return deferLater(reactor, sec, lambda: None)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
while True:
# Loop processing 10 items from myiter at a time, until process_chunk signals
# there are no values left.
result = yield process_chunk(myiter, 10)
if result:
print 'All done'
break
print 'More left'
# Insert the 5 second delay before starting on the next chunk.
yield sleep(5)
d = process_loop(cached)
Другой подход, который вы можете использовать, это использовать twisted.internet.task.cooperate
, cooperate
берет итератор и потребляет его, предполагая, что его потребление потенциально дорого, и распределяя работу по нескольким итерациям реактора. Принимая определение cacheiter
сверху:
from twisted.internet.task import cooperate
def process_loop(cached):
finished = []
def process_one(value):
if value is None:
finished.append(True)
else:
return some_processing(value)
myiter = cacheiter(cached)
while not finished:
value_deferred = myiter.next()
value_deferred.addCallback(process_one)
yield value_deferred
task = cooperate(process_loop(cached))
d = task.whenDone()
Я думаю, что вы пытаетесь сделать это:
@inlineCallbacks
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield some_deferred() # some deferred you'd like evaluated
if result is True:
# here you want to return something, so you have to use returnValue
# the generator you want to return can be written as a generator expression
gen = ((cachename, k, v) for k,v in cachedvalue.items())
returnValue(gen)
Когда genexp не может выразить то, что вы пытаетесь вернуть, вы можете написать закрытие:
@inlineCallbacks
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield some_deferred()
if result is True:
# define the generator, saving the current values of the cache
def gen(cachedvalue=cachedvalue, cachename=cachename):
for k,v in cachedvalue.items():
yield cachename, k, v
returnValue(gen()) # return it