Неблокирующие транзакции при чтении из очереди ActiveMQ с помощью STOMP
Я взаимодействую с ActiveMQ через STOMP. У меня есть один процесс, который публикует сообщения, и несколько процессов, которые подписываются и обрабатывают сообщения (около 10 параллельных экземпляров).
После прочтения сообщения я хочу быть уверенным, что если по какой-либо причине мое приложение завершится сбоем / сбоем, сообщение не будет потеряно. Поэтому, естественно, я обратился к сделкам. К сожалению, я обнаружил, что как только потребитель читает сообщение как часть транзакции, все последующие сообщения не отправляются другим потребителям, пока транзакция не завершится.
Прецедент: abc
В очереди 100 сообщений. Если я активирую следующий код в двух разных вкладках браузера, первая вернется через 10 секунд, а вторая вернется через 20 секунд.
<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();
$con->subscribe(
"/queue/abc",
array()
);
$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
$t = microtime(true);
$msg = $con->readFrame();
if (!$msg) {
die("FAILED!");
}
$t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
array_push($messages, $msg);
$con->ack($msg, $tx);
sleep(1);
}
$con->abort($tx);
Есть что-то, что я пропускаю по кодам? Есть ли способ настроить ActiveMQ (или отправить заголовок), который заставит транзакцию удалить элемент из очереди, разрешить другим процессам потреблять другие сообщения, а если транзакция завершится неудачно или истечет время ожидания, вернет элемент в?
PS: я думал о создании другой очереди - DetentionQueue для каждого процесса чтения, но я действительно не хочу этого делать, если у меня есть выбор.
1 ответ
Возможно, вы захотите настроить размер предварительной выборки подписки, чтобы ActiveMQ не отправлял сообщения в очереди клиенту 1 до того, как клиент 2 получит возможность их получить. По умолчанию он установлен на 1000, так что лучше всего настроить его для вашего варианта использования.
Вы можете установить размер предварительной выборки с помощью заголовка "activemq.prefetchSize=1" в кадре подписки. Обратитесь к странице ActiveMQ Stomp для всех опций фрейма.