Reactor rabbitmq AlreadyClosedException

В моем проекте я использую Springboot версии 2.1.2.RELEASE, реактор-rabbitmq версии 1.0.0.RELEASE. Я создаю кролика, подписываюсь и обрабатываю сообщения вручную. Но через какое-то время это может быть через час или после 1-2 дней работы, я получаю ошибку пропуска сердцебиения, после чего канал закрывается, и я получаю "com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; причина: java.io.IOException: "Сброс соединения по пиру", и мой получатель больше не получает сообщения. Работает только после перезагрузки.

Кролик-клиент имеет connectionFactory.setAutomaticRecoveryEnabled(true); и connectionFactory.setTopologyRecoveryEnabled(true);

поэтому он должен автоматически восстанавливаться по умолчанию, но он не работает.

public void startReceiver(int parallelism) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setRequestedHeartbeat(10);

    Address[] addresses = {new Address("localhost")};
    ReceiverOptions receiverOptions = new ReceiverOptions()
            .connectionFactory(connectionFactory)
            .connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
            .connectionSubscriptionScheduler(Schedulers.elastic());

    Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
    receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
    .doOnSubscribe(s -> System.out.println("Receiver started."))
    .retry()
    .parallel(parallelism)
    .runOn(Schedulers.newParallel("parallel-receiver", parallelism))
    .doOnNext(d -> processMessage(d))
    .subscribe();
}

private void processMessage(AcknowledgableDelivery message) {
    try {
        //some processing
    } catch (Exception e) {
        e.printStackTrace();
    }
    message.ack();
}

Я получаю ошибки

com.rabbitmq.client.MissedHeartbeatException: сердцебиение отсутствует с сердцебиением = 10 секунд, в com.rabbitmq.client.impl.AMQConnection.handleHeartbeatFailure(AMQConnection.java:686) ~[amqp-client-5.5.1.jar!/:5.5.1], в com.rabbitmq.client.impl.nio.NioLoop.lambda$handleHeartbeatFailure$0(NioLoop.java:273) [amqp-client-5.5.1.jar!/:5.5.1], в java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181],

реактор.rabbitmq.Receiver: Отмена потребителя amq.ctag -x02OWhVo3_DPutsPQ0qDw, потребляющего данные теста,

pipeline.core.Exceptions$ErrorCallbackNotImplemented: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; Причина: java.io.IOException: Сброс соединения по одноранговому узлу. Причина: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; причина: java.io.IOException: сброс соединения одноранговым узлом в com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:257) ~[amqp-client-5.5.1.jar!/:5.5.1] на com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:426) ~[amqp-client-5.5.1.jar!/:5.5.1], на com.rabbitmq.client.impl.AMQChannel. передать (AMQChannel.java:420) ~[amqp-client-5.5.1.jar!/:5.5.1] по адресу com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93) ~[amqp-client-5.5.1.jar!/:5.5.1], по адресу com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428) ~[amqp-client-5.5.1.jar!/:5.5.1], в реакторе.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:56) ~[реактор-rabbitmq-1.0.0.RELEASE.jar!/:1.0.0.RELEASE], в реакторе.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:73) ~[реактор-кролик-1.0.0.RELEASE.jar! /: 1.0.0.RELEASE],

0 ответов

Другие вопросы по тегам