Итерировать итератор по кусочкам (из n) в Python?
Можете ли вы придумать хороший способ (возможно, с помощью itertools) разбить итератор на куски заданного размера?
Следовательно l=[1,2,3,4,5,6,7]
с chunks(l,3)
становится итератором [1,2,3], [4,5,6], [7]
Я могу придумать небольшую программу для этого, но это не очень хороший способ, может быть, с itertools.
16 ответов
grouper()
рецепт из itertools
рецепты документации приближаются к тому, что вы хотите:
def grouper(n, iterable, fillvalue=None):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
Впрочем, он заполнит последний кусок значением заполнения.
Менее общее решение, которое работает только с последовательностями, но обрабатывает последний фрагмент по желанию:
[my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size)]
Наконец, решение, которое работает на общих итераторах a, ведет себя так, как нужно:
def grouper(n, iterable):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk
Хотя OP просит функцию возвращать чанки в виде списка или кортежа, в случае, если вам нужно вернуть итераторы, решение Sven Marnach можно изменить:
def grouper_it(n, iterable):
it = iter(iterable)
while True:
chunk_it = itertools.islice(it, n)
try:
first_el = next(chunk_it)
except StopIteration:
return
yield itertools.chain((first_el,), chunk_it)
Некоторые тесты: http://pastebin.com/YkKFvm8b
Это будет немного более эффективно, только если ваша функция будет перебирать элементы в каждом чанке.
Python 3.12 добавляет itertools.batched, который работает со всеми итерируемыми объектами (включая списки):
>>> from itertools import batched
>>> for batch in batched('ABCDEFG', 3):
... print(batch)
('A', 'B', 'C')
('D', 'E', 'F')
('G',)
Начиная с python 3.8, существует более простое решение с использованием
:=
оператор:
def grouper(it: Iterator, n: int) -> Iterator[list]:
while chunck := list(itertools.islice(it, n)):
yield chunck
Применение:
>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
Это будет работать на любой итерации. Возвращает генератор генераторов (для полной гибкости). Теперь я понимаю, что это в основном то же самое, что и решение @reclosedevs, но без пуха. Нет необходимости try...except
как StopIteration
распространяется вверх, что мы и хотим.
next(iterable)
вызов нужен, чтобы поднять StopIteration
когда итерируемое пусто, так как islice
будет продолжать порождать пустые генераторы навсегда, если вы позволите.
Это лучше, потому что это всего две строки, но легко понять.
def grouper(iterable, n):
while True:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
Обратите внимание, что next(iterable)
помещается в кортеж В противном случае, если next(iterable)
сам был повторяем, то itertools.chain
сгладит это. Спасибо Джереми Брауну за указание на эту проблему.
Я работал над чем-то сегодня и придумал, как мне кажется, простое решение. Это похоже на ответ jsbueno, но я считаю, что его даст пустой group
с, когда длина iterable
делится на n
, Мой ответ делает простую проверку, когда iterable
исчерпан.
def chunk(iterable, chunk_size):
"""Generate sequences of `chunk_size` elements from `iterable`."""
iterable = iter(iterable)
while True:
chunk = []
try:
for _ in range(chunk_size):
chunk.append(iterable.next())
yield chunk
except StopIteration:
if chunk:
yield chunk
break
Вот тот, который возвращает ленивые куски; использование map(list, chunks(...))
если вы хотите списки.
from itertools import islice, chain
from collections import deque
def chunks(items, n):
items = iter(items)
for first in items:
chunk = chain((first,), islice(items, n-1))
yield chunk
deque(chunk, 0)
if __name__ == "__main__":
for chunk in map(list, chunks(range(10), 3)):
print chunk
for i, chunk in enumerate(chunks(range(10), 3)):
if i % 2 == 1:
print "chunk #%d: %s" % (i, list(chunk))
else:
print "skipping #%d" % i
Краткая реализация:
chunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))
Это работает, потому что [iter(iterable)]*n
список, содержащий один и тот же итератор n раз; сжатие, которое занимает один элемент от каждого итератора в списке, который является одним и тем же итератором, в результате чего каждый zip-элемент содержит группу n
Предметы.
izip_longest
необходим для полного использования базовой итерируемой, а не остановки итерации при достижении первого исчерпанного итератора, который отсекает любой остаток от iterable
, Это приводит к необходимости отфильтровывать значение заполнения. Поэтому чуть более надежная реализация будет:
def chunker(iterable, n):
class Filler(object): pass
return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))
Это гарантирует, что значение заполнения никогда не будет элементом в базовой итерации. Используя определение выше:
iterable = range(1,11)
map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]
map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]
Эта реализация почти делает то, что вы хотите, но у нее есть проблемы:
def chunks(it, step):
start = 0
while True:
end = start+step
yield islice(it, start, end)
start = end
(Разница в том, что потому что islice
не вызывает StopIteration или что-либо еще на вызовах, которые выходят за пределы конца it
это принесет навсегда; Есть также немного сложная проблема, что islice
результаты должны быть использованы до того, как этот генератор будет повторен).
Чтобы создать движущееся окно функционально:
izip(count(0, step), count(step, step))
Итак, это становится:
(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))
Но это все равно создает бесконечный итератор. Итак, вам нужно время (или, возможно, что-то еще может быть лучше), чтобы ограничить его:
chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))
g = chunk(range(1,11), 3)
tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])
"Проще лучше, чем сложнее" - простой генератор в несколько строк может сделать эту работу. Просто поместите его в какой-нибудь модуль утилит или около того:
def grouper (iterable, n):
iterable = iter(iterable)
count = 0
group = []
while True:
try:
group.append(next(iterable))
count += 1
if count % n == 0:
yield group
group = []
except StopIteration:
yield group
break
Код гольф-версии:
def grouper(iterable, n):
for i in range(0, len(iterable), n):
yield iterable[i:i+n]
Использование:
>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']
Ну вот.
def chunksiter(l, chunks):
i,j,n = 0,0,0
rl = []
while n < len(l)/chunks:
rl.append(l[i:j+chunks])
i+=chunks
j+=j+chunks
n+=1
return iter(rl)
def chunksiter2(l, chunks):
i,j,n = 0,0,0
while n < len(l)/chunks:
yield l[i:j+chunks]
i+=chunks
j+=j+chunks
n+=1
Примеры:
for l in chunksiter([1,2,3,4,5,6,7,8],3):
print(l)
[1, 2, 3]
[4, 5, 6]
[7, 8]
for l in chunksiter2([1,2,3,4,5,6,7,8],3):
print(l)
[1, 2, 3]
[4, 5, 6]
[7, 8]
for l in chunksiter2([1,2,3,4,5,6,7,8],5):
print(l)
[1, 2, 3, 4, 5]
[6, 7, 8]
Пара улучшений в , которые делают это:
Работайте более эффективно и с меньшим количеством стандартного кода в цикле, делегируя извлечение первого элемента самому Python, а не вручную с помощью
next
позвонить вtry
/except StopIteration:
блокироватьОбработать случай, когда пользователь отбрасывает остальные элементы в любом заданном фрагменте (например, внутренний цикл по фрагменту s при определенных условиях); в ответе reclosedevрешении reclosedev , кроме самого первого элемента (который определенно потребляется), любые другие «пропущенные» элементы на самом деле не пропускаются (они просто становятся начальными элементами следующего фрагмента, что означает, что вы больше не извлекаете данные из
n
-выровненные смещения, и если вызывающийbreak
sa перебирают фрагмент, они должны вручную потреблять оставшиеся элементы, даже если они им не нужны)
Объединение этих двух исправлений дает:
import collections # At top of file
from itertools import chain, islice # At top of file, denamespaced for slight speed boost
# Pre-create a utility "function" that silently consumes and discards all remaining elements in
# an iterator. This is the fastest way to do so on CPython (deque has a specialized mode
# for maxlen=0 that pulls and discards faster than Python level code can, and by precreating
# the deque and prebinding the extend method, you don't even need to create new deques each time)
_consume = collections.deque(maxlen=0).extend
def batched_it(iterable, n):
"Batch data into sub-iterators of length n. The last batch may be shorter."
# batched_it('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
n -= 1 # First element pulled for us, pre-decrement n so we don't redo it every loop
it = iter(iterable)
for first_el in it:
chunk_it = islice(it, n)
try:
yield chain((first_el,), chunk_it)
finally:
_consume(chunk_it) # Efficiently consume any elements caller didn't consume
Я забыл, где я нашел вдохновение для этого. Я немного изменил его для работы с GUID MSI в реестре Windows:
def nslice(s, n, truncate=False, reverse=False):
"""Splits s into n-sized chunks, optionally reversing the chunks."""
assert n > 0
while len(s) >= n:
if reverse: yield s[:n][::-1]
else: yield s[:n]
s = s[n:]
if len(s) and not truncate:
yield s
reverse
не относится к вашему вопросу, но я активно использую эту функцию.
>>> [i for i in nslice([1,2,3,4,5,6,7], 3)]
[[1, 2, 3], [4, 5, 6], [7]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True)]
[[1, 2, 3], [4, 5, 6]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True, reverse=True)]
[[3, 2, 1], [6, 5, 4]]
Рекурсивное решение:
def batched(i: Iterable, split: int) -> Tuple[Iterable, ...]:
if chunk := i[:split]:
yield chunk
yield from batched(i[split:], split)
Эта функция принимает итерации, которые не должны бытьSized
, поэтому он также будет принимать итераторы. Он поддерживает бесконечные итерации и выдаст ошибку, если будут выбраны фрагменты с размером меньше 1 (даже если указание size == 1 фактически бесполезно).
Аннотации типа, конечно, необязательны, и/
в параметрах (что делаетiterable
только позиционные) при желании можно удалить.
T = TypeVar("T")
def chunk(iterable: Iterable[T], /, size: int) -> Generator[list[T], None, None]:
"""Yield chunks of a given size from an iterable."""
if size < 1:
raise ValueError("Cannot make chunks smaller than 1 item.")
def chunker():
current_chunk = []
for item in iterable:
current_chunk.append(item)
if len(current_chunk) == size:
yield current_chunk
current_chunk = []
if current_chunk:
yield current_chunk
# Chunker generator is returned instead of yielding directly so that the size check
# can raise immediately instead of waiting for the first next() call.
return chunker()
Вот простой:
n=2
l = list(range(15))
[l[i:i+n] for i in range(len(l)) if i%n==0]
Out[10]: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14]]
for i in range(len(l)): эта часть определяет итерацию по индексам l с использованием функции range() и len(l) в качестве верхнего предела.
if i % n == 0: это условие фильтрует элементы для нового списка. i % n проверяет, делится ли текущий индекс i на n без остатка. Если это так, элемент по этому индексу будет включен в новый список; в противном случае оно будет пропущено.
l[i:i+n]: эта часть извлекает подсписок из l. Он использует нотацию среза для указания диапазона индексов от i до i+n-1. Итак, для каждого индекса i, который соответствует условию i % n == 0, создается подсписок длины n, начиная с этого индекса.
Альтернатива (быстрее для больших вещей):
[l[i:i+n] for i in range(0,len(l),n)]