Невозможно заставить Агрегатор работать
Я пытаюсь понять основы Агрегатора. Ниже приведен пример использования, который я пытаюсь реализовать:
1) Считать сообщение (детали заказа) из очереди.
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
Одно сообщение заказа будет содержать несколько элементов OrderItem. И мы можем ожидать сотни заказов сообщений в очереди.
2) Конечный результат::
a) Каждый элемент заказа должен быть записан в файл.
б) 4 таких файла должны быть записаны в уникальную папку.
Для примера давайте предположим, что мы получили два сообщения заказа, каждое из которых содержит три элемента заказа.
Итак, нам нужно создать 2 папки:
В "папке 1" должно быть 4 файла (по 1 элементу заказа в каждом файле)
В "папке 2" должно быть 2 файла (по 1 элементу заказа в каждом файле). Здесь для простоты мы предполагаем, что больше не поступало сообщений о заказе, и мы можем написать через 5 минут.
Реализация:
- Я могу прочитать сообщение из очереди (websphere MQ) и успешно удалить сообщение.
- Использовал разделитель для разделения сообщения на основе количества элементов порядка.
- Использовал Aggregator для группировки сообщений размером 4.
Я не смог заставить агрегатор работать согласно моему пониманию.
- Я нажимаю один ордер, когда 4 ордера, сообщение агрегируется правильно.
- Я выдвигаю один ордер с 5 ордерами, первые 4 агрегируются, а последний отправляется на канал сброса. Это ожидается, когда MessageGroup освобождается, поэтому последнее сообщение отбрасывается.
- Я нажимаю два ордера, каждый из которых содержит 2 ордера. Последние 2 заказа отправляются на канал сброса.
Стратегия корреляции жестко закодирована (OrderAggregator.java), но вышеприведенный случай должен был сработать.
Нужны указатели на то, как реализовать этот вариант использования, где я могу сгруппировать их в 4 и записать в уникальные папки. Обратите внимание, что все элементы заказа - это независимые заказы книг, и они не имеют отношения к ним.
Ниже приведена конфигурация.
весна-bean.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
OrderAggregator.java
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {
return "items";
}
}
DisplayAggregatedList.java
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
2 ответа
То, что вам нужно, называется expire-groups-upon-completion
:
Если установлено значение true (по умолчанию false), заполненные группы удаляются из хранилища сообщений, что позволяет последующим сообщениям с той же корреляцией формировать новую группу. Поведение по умолчанию - отправлять сообщения с той же корреляцией, что и у завершенной группы, на канал сброса.
Если вам все равно нужно освободить незавершенные группы (например, осталось 2 заказа), рассмотрите возможность использования group-timeout
: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html
Пожалуйста, используйте expire-groups-after-complete ="true" и подумайте об использовании MessageCountReleaseStrategy` для стратегии релиза - Артем Билан