NMS ActiveMQ игнорирует ограничение предварительной выборки, установленное в коде

Я использую текущий Apache.NMS 1.7.1 и Apache.NMS.ActiveMQ 1.7.2. я использую IndividualAcknowledgeпоэтому я пытаюсь сохранить количество загруженных сообщений достаточно низким, потому что оно получается очень медленным, если у меня загружено >>1000 сообщений без их отслеживания (каждый раз выполняется поиск в связанном списке всех сообщений).

У меня есть следующие codenippets:

BlockingCollection<IMessage> _collection = new BlockingCollection<IMessage>();
var factory = new ConnectionFactory("activemq:tcp://localhost:61616");
var _connection = (Connection) factory.CreateConnection();
_connection.PrefetchPolicy.All = 1000;
var session = (Session) _connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
var destination = SessionUtil.GetDestination(session, "queue://testQueue");
var messageConsumer = (MessageConsumer)session.CreateConsumer(destination);
messageConsumer.Listener += message => _collection.Add(message);
_connection.Start();

Очередь testQueue содержит >>20_000 сообщений. Подождав несколько секунд, _collection содержит все сообщения, без моего подтверждения любого из них.

Если я правильно понимаю документацию, мне нужно набрать не более 1000, пока я не начну их признавать.

Как только брокер отправил потребителю предельное количество сообщений предварительной выборки, он не будет отправлять больше сообщений этому потребителю, пока потребитель не подтвердит, по крайней мере, 50% предварительно выбранных сообщений, например, prefetch/2, которые он получил. Когда брокер получил указанные подтверждения, он отправит потребителю дополнительное количество сообщений prefetch / 2 для "пополнения" своего буфера предварительной выборки.

Я также попробовал некоторые варианты, как только настройки QueuePrefetch или установив политику в URL:

activemq:tcp://localhost:61616?nms.prefetchPolicy.queuePrefetch=100

или в очереди:

queue://testQueue?consumer.prefetchSize=100

Что касается медлительности IndividualAcknowledgeЯ уже попробовал несколько других вариантов без особой удачи:

messageConsumer.OptimizeAcknowledge = true;
messageConsumer.OptimizeAcknowledgeTimeOut = 1000;
messageConsumer.OptimizedAckScheduledAckInterval = 500;

Хотя мне не совсем понятны различия между последними вариантами.

1 ответ

Поскольку вы используете асинхронный прослушиватель, брокеру будет отправлено вам все, поскольку клиент продолжает предоставлять кредит посреднику при доставке каждого сообщения вашему асинхронному прослушивателю событий. Чтобы действительно ограничить количество сообщений, доставляемых клиенту в любой момент времени, клиент должен использовать синхронный прием вызовов. Индивидуальное подтверждение лучше всего сочетать с синхронным потреблением, так что вы можете контролировать количество прочитанных сообщений и подтверждать их в определенный момент времени, когда они будут готовы.

Оптимизированные настройки подтверждения не применяются в режиме индивидуального подтверждения, что не влияет на производительность.

Другие вопросы по тегам