Закройте все агрегированные группы после завершения разделения файлов в Camel 2.24
У меня есть вариант использования для разделения файла по строкам и агрегирования на основе некоторого ключа и освобождения всех агрегированных групп после того, как все записи в файле завершат разделение. Количество записей будет неизвестно, и я хотел бы сделать это в версии Camel 2.24.
Пример макета файла:
{
"key": "key1",
"rec": "rec1"
},
{
"key": "key2",
"rec": "rec2"
},
{
"key": "key1",
"rec": "rec3"
},
{
"key": "key2",
"rec": "rec4"
}
Результат должен быть следующим:
{
"key": "key1",
"rec": "rec1rec3"
},
{
"key": "key2",
"rec": "rec2rec4"
}
Все агрегированные группы должны быть освобождены после обработки всех записей в файле (завершение разделения)
<route id="FileReadRoute">
<from uri="file:///Users/project?noop=false&idempotent=false&delay=30s" />
<split streaming="true">
<tokenize token="\n"/>
<setHeader headerName="keyData">
<jsonpath>$.key</jsonpath>
</setHeader>
<to uri="direct:AggregatorflowStart" />
</split>
</route>
<route id="route_Aggregator">
<from uri="direct:AggregatorflowStart"/>
<aggregate strategyRef="myAggregation" ignoreInvalidCorrelationKeys="true" completionTimeout="300000" >
<correlationExpression><header>keyData</header></correlationExpression>
<log message="After aggregation completion = Sending out ${body}"/>
</aggregate>
</route>
Код стратегии агрегации:
public class myAggregation implements AggregationStrategy, Predicate {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//usual logic to aggregateate the message
}
@Override
public boolean matches(Exchange exchange) {
return ((boolean) exchange.getProperty("CamelSplitComplete"));
}
}
Этот предикат освобождает только последнюю агрегированную группу (в этом примере освобождена только группа key2, а не группа key1). но я хочу освободить все группы внутри агрегации. Я понимаю, что есть новые опции, доступные начиная с Camel 2.9 и 3, и все работает нормально, так как я могу его протестировать. но я хочу, чтобы это было достигнуто с версией camel 2.24, поскольку я не могу перенести свое приложение на версию 2.9 или более.
Решение может быть в XML DSL или Java-коде, чтобы закрыть все агрегированные группы. Может кто-нибудь дайте мне знать, как с этим справиться?
Заранее спасибо!..