Разбить генератор на куски, не прогуливая его
(Этот вопрос связан с этим и с этим, но это предварительная ходьба генератора, чего я и хочу избежать)
Я хотел бы разбить генератор на куски. Требования следующие:
- не дополняйте чанки: если количество оставшихся элементов меньше размера чанка, последний чанк должен быть меньше.
- не обходите генератор заранее: вычисление элементов стоит дорого, и это должно выполняться только функцией-потребителем, а не блоком
- что означает, конечно: не накапливать в памяти (нет списков)
Я пробовал следующий код:
def head(iterable, max=10):
for cnt, el in enumerate(iterable):
yield el
if cnt >= max:
break
def chunks(iterable, size=10):
i = iter(iterable)
while True:
yield head(i, size)
# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)
for n, chunk in enumerate(chunks(els, 3)):
for el in chunk:
print 'Chunk %3d, value %d' % (n, el)
И это как-то работает:
Chunk 0, value 0
Chunk 0, value 1
Chunk 0, value 2
Chunk 1, value 3
Chunk 1, value 4
Chunk 1, value 5
Chunk 2, value 6
^CTraceback (most recent call last):
File "xxxx.py", line 15, in <module>
for el in chunk:
File "xxxx.py", line 2, in head
for cnt, el in enumerate(iterable):
KeyboardInterrupt
Buuuut... это никогда не останавливается (я должен нажать ^C
) из-за while True
, Я хотел бы остановить этот цикл всякий раз, когда генератор потребляется, но я не знаю, как обнаружить эту ситуацию. Я пытался поднять исключение:
class NoMoreData(Exception):
pass
def head(iterable, max=10):
for cnt, el in enumerate(iterable):
yield el
if cnt >= max:
break
if cnt == 0 : raise NoMoreData()
def chunks(iterable, size=10):
i = iter(iterable)
while True:
try:
yield head(i, size)
except NoMoreData:
break
# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)
for n, chunk in enumerate(chunks(els, 2)):
for el in chunk:
print 'Chunk %3d, value %d' % (n, el)
Но тогда исключение возникает только в контексте потребителя, а это не то, что я хочу (я хочу, чтобы код потребителя был чистым)
Chunk 0, value 0
Chunk 0, value 1
Chunk 0, value 2
Chunk 1, value 3
Chunk 1, value 4
Chunk 1, value 5
Chunk 2, value 6
Traceback (most recent call last):
File "xxxx.py", line 22, in <module>
for el in chunk:
File "xxxx.py", line 9, in head
if cnt == 0 : raise NoMoreData
__main__.NoMoreData()
Как я могу обнаружить, что генератор исчерпан в chunks
функция, не ходя это?
12 ответов
Одним из способов было бы посмотреть на первый элемент, если таковой имеется, а затем создать и вернуть фактический генератор.
def head(iterable, max=10):
first = next(iterable) # raise exception when depleted
def head_inner():
yield first # yield the extracted first element
for cnt, el in enumerate(iterable):
yield el
if cnt + 1 >= max: # cnt + 1 to include first
break
return head_inner()
Просто используйте это в своем chunk
генератор и поймать StopIteration
исключение, как вы сделали с вашим пользовательским исключением.
Обновление: вот еще одна версия, используя itertools.islice
заменить большую часть head
функция и for
петля. Это просто for
цикл на самом деле делает то же самое, что громоздкий while-try-next-except-break
построить в исходном коде, так что результат гораздо более читабелен.
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator: # stops when iterator is depleted
def chunk(): # construct generator for next chunk
yield first # yield element from for loop
for more in islice(iterator, size - 1):
yield more # yield more elements from the iterator
yield chunk() # in outer generator, yield next chunk
И мы можем стать еще короче, используя itertools.chain
заменить внутренний генератор:
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, size - 1))
Другой способ создания групп / чанков, а не предварительная прогулка - использование генератора itertools.groupby
на ключевой функции, которая использует itertools.count
объект. Так как count
объект не зависит от итерируемого, чанки могут быть легко сгенерированы без знания того, что содержит итерируемое.
Каждая итерация groupby
вызывает next
метод count
объект и генерирует ключ группы / чанка (за которым следуют элементы в чанке) путем целочисленного деления текущего значения счетчика на размер чанка.
from itertools import groupby, count
def chunks(iterable, size=10):
c = count()
for _, g in groupby(iterable, lambda _: next(c)//size):
yield g
Каждая группа / чанк g
В результате получается функция генератора итератор. Тем не менее, так как groupby
использует общий итератор для всех групп, групповые итераторы не могут быть сохранены в списке или каком-либо контейнере, каждый групповой итератор должен использоваться до следующей.
Самое быстрое решение, которое я мог бы найти, благодаря (в CPython) использованию встроенных функций уровня C. Таким образом, не требуется никакого байт-кода Python для создания каждого фрагмента (если только базовый генератор не реализован в Python), который имеет огромное преимущество в производительности. Он обходит каждый кусок перед тем, как вернуть его, но он не выполняет предварительную прогулку за пределы фрагмента, который собирается вернуть:
# Py2 only to get generator based map
from future_builtins import map
from itertools import islice, repeat, starmap, takewhile
# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument
from operator import truth
def chunker(n, iterable): # n is size of each chunk; last chunk may be smaller
return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))
Так как это немного плотно, разложенная версия для иллюстрации:
def chunker(n, iterable):
iterable = iter(iterable)
while True:
x = tuple(islice(iterable, n))
if not x:
return
yield x
Завершение звонка chunker
в enumerate
позволит вам нумеровать куски, если это необходимо.
more-itertools предоставил chunked и ichunked, которые могут достичь цели, это упоминается на странице документа Python 3 itertools.
Как насчет использования itertools.islice
:
import itertools
els = iter(xrange(7))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
Который дает:
[0, 1]
[2, 3]
[4, 5]
[6]
Начал осознавать полезность этого сценария, когда создавал решение для вставки БД из 500 тыс. Строк + с более высокой скоростью.
Генератор обрабатывает данные из источника и "выдает" их построчно; а затем другой генератор группирует выходные данные по блокам и "возвращает" его по блокам. Второй генератор знает только размер куска и ничего более.
Ниже приведен пример, чтобы подчеркнуть концепцию:
#!/usr/bin/python
def firstn_gen(n):
num = 0
while num < n:
yield num
num += 1
def chunk_gen(some_gen, chunk_size=7):
res_chunk = []
for count, item in enumerate(some_gen, 1):
res_chunk.append(item)
if count % chunk_size == 0:
yield res_chunk
res_chunk[:] = []
else:
yield res_chunk
if __name__ == '__main__':
for a_chunk in chunk_gen(firstn_gen(33)):
print(a_chunk)
Протестировано в Python 2.7.12:
[0, 1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12, 13]
[14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27]
[28, 29, 30, 31, 32]
У меня была такая же проблема, но я нашел более простое решение, чем упомянутое здесь:
def chunker(iterable, chunk_size):
els = iter(iterable)
while True:
next_el = next(els)
yield chain([next_el], islice(els, chunk_size - 1))
for i, chunk in enumerate(chunker(range(11), 2)):
for el in chunk:
print(i, el)
# Prints the following:
0 0
0 1
1 2
1 3
2 4
2 5
3 6
3 7
4 8
4 9
5 10
from itertools import islice
def chunk(it, n):
'''
# returns chunks of n elements each
>>> list(chunk(range(10), 3))
[
[0, 1, 2, ],
[3, 4, 5, ],
[6, 7, 8, ],
[9, ]
]
>>> list(chunk(list(range(10)), 3))
[
[0, 1, 2, ],
[3, 4, 5, ],
[6, 7, 8, ],
[9, ]
]
'''
def _w(g):
return lambda: tuple(islice(g, n))
return iter(_w(iter(it)), ())
Вдохновленный Moses Koledoye, я попытался найти решение, которое использует itertools.groupby, но не требует разделения на каждом этапе.
Следующая функция может использоваться в качестве ключа для groupby, и она просто возвращает логическое значение, которое переворачивается после предварительно определенного числа вызовов.
def chunks(chunksize=3):
def flag_gen():
flag = False
while True:
for num in range(chunksize):
yield flag
flag = not flag
flag_iter = flag_gen()
def flag_func(*args, **kwargs):
return next(flag_iter)
return flag_func
Который можно использовать так:
from itertools import groupby
my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")
chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))
for flag, chunk in chunked_generator:
print("Flag is {f}".format(f=flag), list(chunk))
Выход:
Flag is False ['a', 'b', 'c', 'd', 'e']
Flag is True ['f', 'g', 'h', 'i', 'j']
Flag is False ['k', 'l', 'm', 'n', 'o']
Flag is True ['p', 'q', 'r', 's', 't']
Flag is False ['u', 'v', 'w', 'x', 'y']
Flag is True ['z']
Я сделал скрипку, демонстрирующую этот код.
Три варианта самого быстрого решения ShadowRanger до Python-3.12, использующего получение кортежей фрагментов и удаление значения заполнения из последнего фрагмента. ОниStefan_*
в этом тесте для итерации с 10^6 элементами и размером фрагмента от n=2 до n=1000:
Протестировано на Python 3.10.8. Понятия не имею, что происходит с n=7 до n=10, я запускал тест несколько раз, и всегда так.
Stefan_hold
Этот блок содержит один фрагмент, и для каждого следующего фрагмента он возвращает удерживаемый фрагмент, а затем вместо него удерживает следующий. В конце очистите и дайте последний удерживаемый фрагмент.
def chunker(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
ShadowRanger проверяетif x[-1] is fillvalue:
за каждый кусокx
. В этом решении я вместо этого использую более быстрыйif stopped:
проверка значения bool. Чтобы это работало, я привязываю ко входу «пустой» итератор, единственная задача которого — установитьstopped
кTrue
:
def chunker(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
Stefan_compress
Этот вариант исключает интерпретацию Python на протяжении всего фрагмента, кроме последнего. Оно используетtee
чтобы получить три копии итератора фрагментов из файла . Копия вносит все, кроме последнего фрагмента. Копия находится на один фрагмент впереди, ее единственная задача — не допустить последнего фрагмента изmain
. slow
копия на шаг отстаетfast
и предоставляет последний кусок.
def chunker(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
unfill
Помощник, который мои решения используют для удаления значения заполнения из последнего фрагмента:
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
Оптимизация
ShadowRanger_del
является незначительной модификацией решения ShadowRanger, удаляющей переменную фрагмента после получения фрагмента, так чтоzip_longest
может использовать оптимизацию повторного использования кортежа результатов. МойStefan_stopped
также использует эту оптимизацию.
Хотя это помогает/работает только в том случае, если потребитель нашего итератора фрагментов также не сохраняет ссылку на кортеж, например, если он используетmap(sum, chunker(...))
вычислять суммы фрагментов чисел.
Вот тот же тест, но без негоdel
оптимизация:
Полный код
Решения, проверка правильности, бенчмаркинг, построение графиков.
import sys
print(sys.version)
import matplotlib.pyplot as plt
from itertools import takewhile, zip_longest, chain, compress, tee, repeat
from timeit import timeit
from statistics import mean, stdev
from collections import deque
import gc
from random import sample
from bisect import bisect
#----------------------------------------------------------------------
# Solutions
#----------------------------------------------------------------------
def ShadowRanger(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
def ShadowRanger_del(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
del x
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
def Stefan_stopped(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
def Stefan_compress(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
def Stefan_hold(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
funcs = ShadowRanger, ShadowRanger_del, Stefan_stopped, Stefan_compress, Stefan_hold
#----------------------------------------------------------------------
# Correctness checks
#----------------------------------------------------------------------
def run(f):
return list(f(n, iter(range(N))))
for n in range(1, 10):
for N in range(100):
args = n, range(N)
expect = run(ShadowRanger)
for f in funcs:
result = run(f)
if result != expect:
raise Exception(f'{f.__name__} failed for {n=}, {N=}')
#----------------------------------------------------------------------
# Benchmarking
#----------------------------------------------------------------------
benchmarks = []
# Speed
N = 10 ** 6
for n in *range(2, 11), 20, 50, 100, 200, 500, 1000:
for _ in range(1):
times = {f: [] for f in funcs}
def stats(f):
ts = [t * 1e3 for t in sorted(times[f])[:10]]
return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} ms '
for _ in range(100):
for f in sample(funcs, len(funcs)):
gc.collect()
t = timeit(lambda: deque(f(n, repeat(None, N)), 0), number=1)
times[f].append(t)
print(f'\n{n = }')
for f in sorted(funcs, key=stats):
print(stats(f), f.__name__)
benchmarks.append((n, times))
#----------------------------------------------------------------------
# Plotting
#----------------------------------------------------------------------
names = [f.__name__ for f in funcs]
stats = [
(n, [mean(sorted(times[f])[:10]) * 1e3 for f in funcs])
for n, times in benchmarks
]
colors = {
'Stefan_stopped': 'blue',
'Stefan_compress': 'lime',
'Stefan_hold': 'gold',
'ShadowRanger': 'red',
'ShadowRanger_del': 'pink',
}
ns = [n for n, _ in stats]
print('%28s' % 'chunk size', *('%5d' % n for n in ns))
print('-' * 95)
x = range(len(ns))
for i, name in enumerate(names):
ts = [tss[i] for _, tss in stats]
ts = [tss[i] / tss[0] * 100 for _, tss in stats]
color = colors[name]
if color:
plt.plot(x, ts, '.-', color=color, label=name)
print('%29s' % name, *('%5.1f' % t for t in ts))
plt.xticks(x, ns, size=9)
plt.ylim(0, 120)
plt.title('chunker(n, $10^6$ elements)', weight='bold')
plt.xlabel('Chunk size n', weight='bold')
plt.ylabel("Time (for complete iteration) in % relative to ShadowRanger's", weight='bold')
plt.legend(loc='lower center')
#plt.show()
plt.savefig('chunker_plot.png', dpi=200)
Вы сказали, что не хотите хранить вещи в памяти, значит ли это, что вы не можете создать промежуточный список для текущего чанка?
Почему бы не пройти через генератор и не вставить значение часового между кусками? Потребитель (или подходящая оболочка) может игнорировать стража:
class Sentinel(object):
pass
def chunk(els, size):
for i, el in enumerate(els):
yield el
if i > 0 and i % size == 0:
yield Sentinel
РЕДАКТИРОВАТЬ другое решение с генератором генераторов
Вы не должны делать while True
в вашем итераторе, но просто переберите его и обновите номер чанка на каждой итерации:
def chunk(it, maxv):
n = 0
for i in it:
yield n // mavx, i
n += 1
Если вы хотите генератор генераторов, вы можете иметь:
def chunk(a, maxv):
def inner(it, maxv, l):
l[0] = False
for i in range(maxv):
yield next(it)
l[0] = True
raise StopIteration
it = iter(a)
l = [True]
while l[0] == True:
yield inner(it, maxv, l)
raise StopIteration
с существом итеративным.
Тесты: на Python 2.7 и 3.4:
for i in chunk(range(7), 3):
print 'CHUNK'
for a in i:
print a
дает:
CHUNK
0
1
2
CHUNK
3
4
5
CHUNK
6
И на 2.7:
for i in chunk(xrange(7), 3):
print 'CHUNK'
for a in i:
print a
дает тот же результат.
Но ВНИМАНИЕ: list(chunk(range(7))
блоки на 2.7 и 3.4