Группировка StreamEx по спискам возвращает неверное количество записей
Следующий код разбивает поток объектов на куски по 1000, обрабатывает их при материализации и возвращает общее количество объектов в конце.
Во всех случаях возвращаемое число является правильным, если размер потока не равен 1. В случае, если размер потока равен 1, возвращаемое число равно 0.
Любая помощь будет принята с благодарностью. Мне также пришлось взломать обратный вызов в случае, если в потоке нет записей равных 0. Я бы тоже хотел это исправить.
AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
.forEach((chunk) ->
{
//... process each chunk
}
);
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();
4 ответа
В конце я пошел с Iterators.partition() Гуавы, чтобы разделить мой поток объектов на куски:
MutableInt recordCounter = new MutableInt();
try {
Iterators.partition(myObjects.iterator(), 1000)
.forEachRemaining((chunk) -> {
//process each chunk
...
recordCounter.add(chunk.size());
});
} catch (Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.getValue();
Как говорит JavaDoc:
sameGroup - невмешивающий предикат без сохранения состояния, применяемый к паре смежных элементов, который возвращает true для элементов, принадлежащих к той же группе.
Предикат должен быть без гражданства, а это не ваш случай. Вы неправильно используете метод, поэтому не можете получить ожидаемый результат. Он работает близко к тому, что вы хотите, чисто случайно, вы не можете полагаться на это поведение, оно может измениться в будущих версиях StreamEx.
@Nazarii Бардюк объяснил, почему это не работает. Я отвечаю аналогичным требованиям, чтобы разделить поток раньше. Поэтому я разобрался и внес несколько изменений в: StreamEx-0.8.7. Вот простой пример:
int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
System.out.println(chunk);
return chunk.size();
}).sum();
System.out.println(count);
Если вы находитесь в начале вашего проекта, вы можете попробовать, и код будет:
try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
return stream.splitToList(1000)
.mapToInt((chunk) -> {
//... process each chunk
return chunk.size();
}).sum();
}
Изначально счетчик использовался, чтобы знать, когда следует разбивать порции, и подсчет общего количества объектов ненадежен. Когда поток имеет размер 0 или 1 groupRuns
функция не выполнена.
Поэтому вам нужен другой способ подсчета объектов. Вместо того, чтобы просто потреблять предметы в forEach
вы можете вернуть количество обработанных объектов chunk.size()
а также sum
их в конце
AtomicInteger counter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
return stream
.groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
.mapToLong((chunk) -> {
//... process each chunk
return chunk.size();
})
.sum();
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}