Невозможно заставить Агрегатор работать

Я пытаюсь понять основы Агрегатора. Ниже приведен пример использования, который я пытаюсь реализовать:

1) Считать сообщение (детали заказа) из очереди.

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
  <orderItem>
    <isbn>12333454443</isbn>
    <quantity>4</quantity>
  </orderItem>
  <orderItem>
    <isbn>545656777</isbn>
    <quantity>50</quantity>
  </orderItem>
..
..
</order>

Одно сообщение заказа будет содержать несколько элементов OrderItem. И мы можем ожидать сотни заказов сообщений в очереди.

2) Конечный результат::

a) Каждый элемент заказа должен быть записан в файл.

б) 4 таких файла должны быть записаны в уникальную папку.

Для примера давайте предположим, что мы получили два сообщения заказа, каждое из которых содержит три элемента заказа.

Итак, нам нужно создать 2 папки:

В "папке 1" должно быть 4 файла (по 1 элементу заказа в каждом файле)

В "папке 2" должно быть 2 файла (по 1 элементу заказа в каждом файле). Здесь для простоты мы предполагаем, что больше не поступало сообщений о заказе, и мы можем написать через 5 минут.

Реализация:


  1. Я могу прочитать сообщение из очереди (websphere MQ) и успешно удалить сообщение.
  2. Использовал разделитель для разделения сообщения на основе количества элементов порядка.
  3. Использовал Aggregator для группировки сообщений размером 4.

Я не смог заставить агрегатор работать согласно моему пониманию.

  1. Я нажимаю один ордер, когда 4 ордера, сообщение агрегируется правильно.
  2. Я выдвигаю один ордер с 5 ордерами, первые 4 агрегируются, а последний отправляется на канал сброса. Это ожидается, когда MessageGroup освобождается, поэтому последнее сообщение отбрасывается.
  3. Я нажимаю два ордера, каждый из которых содержит 2 ордера. Последние 2 заказа отправляются на канал сброса.
    Стратегия корреляции жестко закодирована (OrderAggregator.java), но вышеприведенный случай должен был сработать.

Нужны указатели на то, как реализовать этот вариант использования, где я могу сгруппировать их в 4 и записать в уникальные папки. Обратите внимание, что все элементы заказа - это независимые заказы книг, и они не имеют отношения к ним.

Ниже приведена конфигурация.

весна-bean.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
  <int:channel id="mqInbound"/>
  <int:channel id="item"/>
  <int:channel id="itemList"/>
  <int:channel id="aggregatorDiscardChannel"/>

  <int-jms:message-driven-channel-adapter id="jmsIn"
                                      channel="mqInbound"
                                      destination="requestQueue" 
                                      message-   converter="orderMessageConverter"/>

  <int:splitter input-channel="mqInbound"  output-channel="item" expression="payload.orderItem"/>

  <int:chain id="aggregateList" input-channel="item" output-channel="itemList"  >
    <int:header-enricher>
      <int:header name="sequenceSize" expression="4" overwrite="true"/>
    </int:header-enricher>
    <int:aggregator  correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"  discard-channel="aggregatorDiscardChannel" />
  </int:chain>

  <int:service-activator input-channel="itemList"                 ref="displayAggregatedList" method="display"/>
  <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>

  <bean id="orderAggregator"       class="com.samples.Aggregator.OrderAggregator"/>
  <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
  ...
  ....
</beans>

OrderAggregator.java

public class OrderAggregator {

@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {

    return orderItemTypeList;
}

@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {

    return "items";
}

}

DisplayAggregatedList.java

public class DisplayAggregatedList {

public void display(List <OrderItemType> orderItemTypeList) {

    System.out.println("######## Display Aggregated ##############");
    for(OrderItemType oit : orderItemTypeList) {
        System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
    }
}
public void displayDiscarded(Message<?> message) {

    System.out.println("######## Display Discarded ##############" + message);
}
}   

2 ответа

Решение

То, что вам нужно, называется expire-groups-upon-completion:

Если установлено значение true (по умолчанию false), заполненные группы удаляются из хранилища сообщений, что позволяет последующим сообщениям с той же корреляцией формировать новую группу. Поведение по умолчанию - отправлять сообщения с той же корреляцией, что и у завершенной группы, на канал сброса.

Если вам все равно нужно освободить незавершенные группы (например, осталось 2 заказа), рассмотрите возможность использования group-timeout: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html

Пожалуйста, используйте expire-groups-after-complete ="true" и подумайте об использовании MessageCountReleaseStrategy` для стратегии релиза - Артем Билан

Другие вопросы по тегам