Реализация неблокирующей повторной попытки с откатом с помощью spring-amqp и rabbitmq

Я ищу хороший способ реализовать повторные попытки с помощью политики отката с использованием Spring amqp и Rabbit MQ, но требование заключается в том, что слушатель не должен быть заблокирован (поэтому он может свободно обрабатывать другие сообщения). Я вижу похожий вопрос, задаваемый / отвечаемый здесь, но он не включает решение для "отступления":

RabbitMQ & Spring amqp повторяется без блокировки потребителей

У меня есть следующие вопросы:

  1. Блокирует ли реализация по умолчанию с пружинным повтором потоки при повторной попытке? Реализация в github указывает, что это так.

  2. Если приведенное выше предположение верно, единственный способ сделать это - реализовать отдельную очередь для повторных попыток (DLQ?) И установить TTL для каждого сообщения (при условии, что мы не хотим блокировать потоки для интервала отката).

  3. Если мы воспользуемся описанным выше подходом (DLQ или отдельная очередь), не потребуются ли нам отдельные очереди для каждой попытки повторной попытки? Если мы используем только 1 очередь для повторных попыток, эта же очередь будет содержать сообщения с TTL в диапазоне от минимального интервала повторения до максимального интервала повторения, и если сообщение в начале очереди имеет максимальный TTL, сообщение позади него не будет взял, даже если у него есть мин TTL. Это согласно документации Rabbit MQ TTL здесь (см. Предостережения):

  4. Есть ли другой способ реализовать неблокирующий механизм Backoff Retry?

Добавление некоторой информации о конфигурации для устранения неполадок @garyrussel:

Конфигурация очереди:

    <rabbit:queue name="regular_requests_queue"/>
    <rabbit:queue name="retry_requests_queue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="regular_exchange" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:direct-exchange name="regular_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="regular_requests_queue" key="regular-request-key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:direct-exchange name="retry_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="retry_requests_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="retryRecoverer" class="com.testretry.RetryRecoverer">
         <constructor-arg ref="retryTemplate"/>
         <constructor-arg value="retry_exchange"/>
    </bean>

    <rabbit:template id="templateWithOneRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>
    <rabbit:template id="retryTemplate" connection-factory="connectionFactory" exchange="retry_exchange"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="1"/>
            </bean>
        </property>
    </bean>

3 ответа

Вот окончательное решение, которое я в итоге реализовал. Существует 1 очередь на "интервал повторения", 1 обмен на очередь повторения. Все они передаются в пользовательский RepublishRecoverer, который создает список восстановителей.

Пользовательский заголовок с именем "RetryCount" добавляется в сообщение, и в зависимости от значения "RetryCount" сообщение публикуется в нужном обмене / очереди с другим "сроком действия". Каждая очередь повторов настраивается с помощью DLX, для которого задано значение "регулярный обмен" (т. Е. Запросы направляются в обычную очередь).

<rabbit:template id="genericTemplateWithRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>

<!-- Create as many templates as retryAttempts (1st arg) in customRetryTemplate-->
<rabbit:template id="genericRetryTemplate1" connection-factory="consumerConnFactory" exchange="retry_exchange_1"/>
<rabbit:template id="genericRetryTemplate2" connection-factory="consumerConnFactory" exchange="retry_exchange_2"/>
<rabbit:template id="genericRetryTemplate3" connection-factory="consumerConnFactory" exchange="retry_exchange_3"/>
<rabbit:template id="genericRetryTemplate4" connection-factory="consumerConnFactory" exchange="retry_exchange_4"/>
<rabbit:template id="genericRetryTemplate5" connection-factory="consumerConnFactory" exchange="retry_exchange_5"/>

<rabbit:queue name="regular_requests_queue"/>

<!-- Create as many queues as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:queue name="retry_requests_queue_1">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_2">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_3">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_4">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_5">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="regular_exchange">
    <rabbit:bindings>
        <rabbit:binding queue="regular_requests_queue" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!-- Create as many exchanges as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:direct-exchange name="retry_exchange_1">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_1" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_2">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_2" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_3">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_3" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_4">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_4" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_5">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_5" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>


<!-- retry config begin -->
<!-- Pass in all templates and exchanges created as list/array arguments below -->
<bean id="customRetryRecoverer" class="com.test.listeners.CustomRetryRecoverer">
    <!-- Pass in list of templates -->
     <constructor-arg>
        <list>
            <ref bean="genericRetryTemplate1"/>
            <ref bean="genericRetryTemplate2"/>
            <ref bean="genericRetryTemplate3"/>
            <ref bean="genericRetryTemplate4"/>
            <ref bean="genericRetryTemplate5"/>
        </list>
     </constructor-arg>
     <!-- Pass in array of exchanges -->
     <constructor-arg value="retry_exchange_1,retry_exchange_2,retry_exchange_3,retry_exchange_4,retry_exchange_5"/>
     <constructor-arg ref="customRetryTemplate"/>
</bean>

<bean id="retryInterceptor"
      class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="customRetryRecoverer"/>
    <property name="retryOperations" ref="retryTemplate"/>
    <property name="messageKeyGenerator" ref="msgKeyGenerator"/>
</bean>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <!--  Set to 1 - just for the initial attempt -->
            <property name="maxAttempts" value="1"/>
        </bean>
    </property>
</bean>

 <bean id="customRetryTemplate" class="com.test.retry.CustomRetryTemplate">
    <constructor-arg value="5"/> <!-- max attempts -->
    <constructor-arg value="3000"/> <!-- Initial interval -->
    <constructor-arg value="5"/> <!-- multiplier for backoff -->
</bean>

<!-- retry config end -->

Вот код для CustomRetryRecoverer:

public class CustomRetryRecoverer extends
        RepublishMessageRecoverer {

    private static final String RETRY_COUNT_HEADER_NAME = "RetryCount";
    private List<RepublishMessageRecoverer> retryExecutors = new ArrayList<RepublishMessageRecoverer>();
    private TriggersRetryTemplate retryTemplate;

    public TriggersRetryRecoverer(AmqpTemplate[] retryTemplates, String[] exchangeNames, TriggersRetryTemplate retryTemplate) {
        super(retryTemplates[0], exchangeNames[0]);
        this.retryTemplate = retryTemplate;

        //Get lower of the two array sizes
        int executorCount = (exchangeNames.length < retryTemplates.length) ? exchangeNames.length : retryTemplates.length;
        for(int i=0; i<executorCount; i++) {
            createRetryExecutor(retryTemplates[i], exchangeNames[i]);
        }
        //If not enough exchanges/templates provided, reuse the last exchange/template for the remaining retry recoverers
        if(retryTemplate.getMaxRetryCount() > executorCount) {
            for(int i=executorCount; i<retryTemplate.getMaxRetryCount(); i++) {
                createRetryExecutor(retryTemplates[executorCount-1], exchangeNames[executorCount-1]);
            }
        }
    }

    @Override
    public void recover(Message message, Throwable cause) {

        if(getRetryCount(message) < retryTemplate.getMaxRetryCount()) {
            incrementRetryCount(message);

            //Set the expiration of the retry message
            message.getMessageProperties().setExpiration(String.valueOf(retryTemplate.getNextRetryInterval(getRetryCount(message)).longValue()));

            RepublishMessageRecoverer retryRecoverer = null;
            if(getRetryCount(message) != null && getRetryCount(message) > 0) {
                retryRecoverer = retryExecutors.get(getRetryCount(message)-1);
            } else {
                retryRecoverer = retryExecutors.get(0);
            }
            retryRecoverer.recover(message, cause);
        } else {
            //Retries exchausted - do nothing
        }
    }

    private void createRetryExecutor(AmqpTemplate template, String exchangeName) {
        RepublishMessageRecoverer retryRecoverer = new RepublishMessageRecoverer(template, exchangeName);
        retryRecoverer.errorRoutingKeyPrefix(""); //Set KeyPrefix to "" so original key is reused during retries
        retryExecutors.add(retryRecoverer);
    }   

    private Integer getRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME);
        }

        return retryCount;
    }

    private void incrementRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME)+1;
        }
        msg.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER_NAME, retryCount);
    }

}

Код для CustomRetryTemplate здесь не публикуется, но он содержит простые переменные для maxRetryCount, initialInterval и множителя.

  1. да
  2. через 4 ...

Вы можете использовать максимальное количество попыток = 1 с подклассом RepublishMessageRecoverer и реализовать additionalHeaders добавить, скажем, заголовок подсчета повторов.

Затем вы можете переиздавать в другую очередь для каждой попытки.

Программа восстановления на самом деле не структурирована для публикации в разных очередях (мы должны это изменить), поэтому вам может потребоваться написать свой собственный программу восстановления и делегировать одну из нескольких RepublishMessageRecoverer,

Подумайте о внесении вашего решения в платформу.

Вы смотрели на плагин rabbitmq delayer, который задерживает сообщения при обмене вместо очереди? Согласно документации, сообщения, отправленные на обмен с задержкой, кажутся постоянными на уровне обмена.

Используя заголовок сообщения с пользовательским счетчиком повторов и обмен отложенными сообщениями, мы можем добиться неблокирующего поведения без уродства этих промежуточных очередей, комбинации dlx и шаблона.

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

Другие вопросы по тегам