Hazelcast Jet отбрасывает пустые результаты агрегации
У меня есть скользящее окно и специальный накопитель, который может иметь пустые результаты. Что было бы правильным способом избежать попадания таких "пустых" аккумуляторов агрегации в сток?
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Long, Foo>map("map"))
.map(Map.Entry::getValue)
.addTimestamps(Foo::getTimeMillisecond, LIMIT)
.window(WindowDefinition.sliding(100, 10))
.aggregate(FooAggregateOperations.aggregateFoo(), (s, e, r) -> {
return String.format("started: %s\n%s\nended: %s\n", s, r, e);
})
.drainTo(Sinks.files(sinkDirectory));
Как видите, агрегатор возвращает String:
public class FooAggregateOperations {
public static AggregateOperation1<Foo, FooAccumulator, String> aggregateFoo() {
return AggregateOperation
.withCreate(FooAccumulator::new)
.andAccumulate(FooAggregateOperations::accumulate)
.andCombine(FooAggregateOperations::combine)
.andDeduct(FooAggregateOperations::deduct)
.andFinish(FooAccumulator::getResult);
}
}
По сути, вопрос заключается в том, как отбросить игнорируемые окна / результаты агрегации, прежде чем они будут объединены / вычтены с другими результатами или сброшены в сток?
1 ответ
Чтобы отфильтровать пустые результаты агрегации, вы можете использовать следующий подход:
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Long, Foo>map("map"))
.map(Map.Entry::getValue)
.addTimestamps(Foo::getTimeMillisecond, LIMIT)
.window(WindowDefinition.sliding(100, 10))
.aggregate(FooAggregateOperations.aggregateFoo(),
(s, e, r) -> tuple3(s, e, r))
.filter(t -> !isEmpty(t.f2()))
.map(t -> String.format("started: %s\n%s\nended: %s\n", t.f0(), t.f2(), t.f1()))
.drainTo(Sinks.files("sinkDirectory"));
Это позволяет сохранить результат агрегации во временном кортеже, а затем применить фильтрацию и затем окончательное сопоставление.
Я также создал проблему на GitHub, и мы рассмотрим возможность поддержки этого поведения прямо внутри операции агрегирования.