Как обработать порядок сообщений в JMS?

Я рассматриваю клиент-серверное приложение, написанное на Java. Сервер принимает сообщения JMS и обрабатывает их, но сообщения могут приходить в непредвиденном порядке, и перед сообщением заказа может прийти отмена. Как вы справляетесь с таким делом? Вы делаете это в MDB?

Каковы некоторые стратегии или модели для такого сценария?

5 ответов

Решение

Насколько я знаю, это называется доставкой "вне очереди" и является частью атрибутов качества обслуживания (QoS) системы JMS. Я не думаю, что это является частью спецификации JMS, но некоторые провайдеры поддерживают это, может быть. Это будет зависеть от конкретной реализации JMS, которую вы используете.

Однако обратите внимание, что JMS предназначен для рассылки сообщений нескольким потребителям таким образом, чтобы распределить нагрузку. Если сообщение должно быть доставлено заказанным способом, это невозможно - это в основном приводит к сериализации доставки сообщения, и сообщение не может быть обработано одновременно.

Википедия говорит это лучше меня:

Очередь JMS Промежуточная область, которая содержит сообщения, которые были отправлены и ожидают чтения. Обратите внимание, что вопреки тому, что предлагает очередь имен, сообщения не должны доставляться в порядке отправки. Если управляемый сообщениями пул компонентов содержит более одного экземпляра, тогда сообщения могут обрабатываться одновременно, и, таким образом, возможно, что более позднее сообщение будет обработано раньше, чем более раннее. Очередь JMS гарантирует только то, что каждое сообщение обрабатывается только один раз.

Внезапный запрос на отмену нелегко достичь с помощью JMS. Две идеи:

  • Хранить тикет, который соответствует каждому сообщению в базе данных, можно легко использовать для отмены сообщения. Когда сообщение доставлено, MDB проверяет, действителен ли соответствующий билет. Если да, продолжается далее, если нет, отбросьте сообщение.
  • Попробуйте установить размер пула MDB равным единице. Возможно, в этом случае доставка будет заказана. Изменение размера пула - приложение. зависит от сервера, но большинство из них поддерживают размер пула bean-компонента.

В противном случае, возможно, посмотрите на шаблон хранилища сообщений. В любом случае стоит проверить сайт EAI.

Ваша система будет намного более гибкой, если она сможет справиться с сообщениями, вышедшими из строя. Шаблон, который я использовал для решения этой проблемы в прошлом, заключается в использовании очереди задержки (в системе, которая обрабатывала 8 миллионов сообщений в день в финансовом мире).

В вашем примере, если я получу удаление для заказа, который еще не получил, я бы отложил его на некоторое время и повторил попытку. Если бы я все еще ничего не знал о порядке, который меня просят удалить, я бы поднял какую-то ошибку (ответ исходному отправителю, отправка сообщения в специальную очередь ошибок,...).

Что касается реализации очереди задержки, то это может быть другая очередь JMS со службой, которая может принимать сообщения с задержкой. Затем он периодически читает отложенные сообщения и проверяет, истек ли отсроченный период времени, и повторно отправляет сообщение в исходную очередь назначения.

Второй совет о проверке сайта EAI и книги, на которой он основан (фантастический текст о моделях MOM и MOM).

Лично я бы исследовал Resequencer, хотя.

Как обеспечить последовательность сообщений, полученных mdb? о подобной теме на стороне сервера, где кто-то указывает, что ActiveMQ может иметь решение, которое сохраняет порядок. Я предполагаю, что это делает это специфическим на вкус хотя.

Очередь JMS в целом следует рассматривать как очередь FIFO .
Причины порчи заказа, согласно документации IBM MQ , следующие:

  • Несколько направлений
  • Несколько производителей
  • Несколько потребителей
  • Публикация и подписка (подразумевается несколько экземпляров подписки)

Аналогичные утверждения для ActiveMQ

ActiveMQ сохранит порядок сообщений, отправляемых одним производителем всем потребителям по теме. Если в очереди есть один потребитель, то порядок сообщений, отправляемых одним производителем, также будет сохранен. Если у вас есть несколько потребителей в одной очереди, потребители будут конкурировать за сообщения, и ActiveMQ будет распределять нагрузку между ними, поэтому порядок будет потерян.

Вам нужно обрабатывать сообщения одной и той же группы одним и тем же потоком (последовательно), чтобы не менять их порядок. Kafka обеспечивает интеллектуальное разделение сообщения на основе ключа сообщения . ActiveMQ имеет понятие групп сообщений, которое использует заголовок сообщения.

Рассмотрите пример разделения в потребительском приложении с использованием справедливых замков Java, если вы не можете использовать вышеизложенное. Чтение сообщений из очереди и создание разделов должны быть синхронными, фактическая обработка может быть распараллелена.

      String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
    // use DUPS_OK_ACKNOWLEDGE with deduplication service which improve performance of sequential read
    message = (String) jmsTemplate.receiveAndConvert(QUEUE);
    if (message == null || deduplicationService.deduplicate(md5(message)))
        continue;
    messageKey = findByXPath(path, message)
    messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
    partitioningSupport.getFairLock().unlock();
}

messageKeyLock.lock();
try {
    // parallel message processing
} finally {
    messageKeyLock.unlock();
}

С 10 разнообразными ключами (количество уникальных ключей), 10 потребительскими потоками и 255 разделами блокировка примечательна.

При 1000 разнообразии ключей и прочих одинаковых блокировка довольно случайная и не заметная (вероятность ожидания относительно мала).

Реализация

      import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.StringUtils.isBlank;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class PartitioningSupport {
    private final ConcurrentHashMap<Integer, ReentrantLock> locks = new ConcurrentHashMap<>();
    private final ReentrantLock fairLock = new ReentrantLock(true);
    private final int diversity;

    public PartitioningSupport() {
        this(0xff);
    }

    public PartitioningSupport(int diversity) {
        this.diversity = diversity;
    }

    public ReentrantLock getPartitionLock(String messageKey) {
        fairLock.lock();
        try {
            int partition = partition(messageKey);
            ReentrantLock lock = locks.get(partition);
            if (lock == null) {
                lock = new ReentrantLock(true);
                locks.put(partition, lock);
            }
            return lock;
        } finally {
            fairLock.unlock();
        }
    }

    private int partition(String key) {
        return (isBlank(key) ? nextInt() : key.hashCode()) & diversity;
    }

    public ReentrantLock getFairLock() {
        return fairLock;
    }
}

Тест

      import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.lang.System.out;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.jupiter.api.Test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class PartitioningSupportTest {
    private BlockingQueue<String> queue = new LinkedBlockingDeque<>();
    private List<Future<?>> results = new ArrayList<>();
    private ExecutorService consumers = newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("consumer-%s").build());
    private PartitioningSupport partitioningSupport = new PartitioningSupport();
    private volatile ConcurrentHashMap<String, AtomicInteger> ids;
    private int repeatTest = 10;
    private int uniqueKeysCount = 1; // 100
    private int totalMessagesCount = 1000;

    @Test
    public void testProcessingOrder() throws InterruptedException, ExecutionException {
        for (int testIter = 0; testIter < repeatTest; testIter++) {
            ids = new ConcurrentHashMap<>();
            results = new ArrayList<>();

            for (int messageIter = 1; messageIter <= totalMessagesCount; messageIter++) {
                String messageKey = "message-" + nextInt(0, uniqueKeysCount);
                ids.putIfAbsent(messageKey, new AtomicInteger());
                queue.put(format("%s.%s", messageKey, messageIter));
            }

            for (int i = 0; i < totalMessagesCount; i++)
                results.add(consumers.submit(this::consume));

            for (Future<?> result : results)
                result.get();
        }
        consumers.shutdown();
    }

    private void consume() {
        try {
            String message;
            String messageKey;
            ReentrantLock messageKeyLock;
            partitioningSupport.getFairLock().lock();
            try {
                message = queue.take();
                messageKey = message.substring(0, message.indexOf('.'));
                messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
            } finally {
                partitioningSupport.getFairLock().unlock();
            }

            messageKeyLock.lock();
            try {

                sleep(nextLong(1, 10));

                int ordinal = parseInt(message.substring(message.indexOf('.') + 1));
                int previous = ids.get(messageKey).getAndSet(ordinal);
                out.printf("processed: %s - %s%n", messageKey, ordinal);
                assertTrue(ordinal > previous, format("broken order %s [%s -> %s]", messageKey, previous, ordinal));
            } finally {
                messageKeyLock.unlock();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
Другие вопросы по тегам