Потребитель AMQP перестает получать 127-е сообщение

ПРОБЛЕМА: приложение-издатель постоянно публикует сообщения AMQP для брокера Artemis, но приложение-потребитель перестает получать сообщения после 127-го.

Fwiw - По совпадению, заметил отрывок с веб-страницы Смоллри:

"размер буфера по умолчанию (127) может быть настроен с помощью свойства smallrye.messaging.emitter.default-buffer-size, установленного с помощью MicroProfile Config. Обратите внимание, что это значение применяется только тогда, когда OnOverflow не используется".

(это с: https://smallrye.io/smallrye-reactive-messaging/ - заголовок раздела "14.3. @Channel")

Однако я попытался добавить изменение свойства (чтобы посмотреть, смогу ли я хотя бы повлиять на изменение числа "127") с помощью microprofile-config.properties, но безрезультатно.

то есть ни один из них не работал...

   smallrye.messaging.emitter.default-buffer-size=10000
      or
   mp.messaging.incoming.data.default-buffer-size=10000

консоль издателя (сервера):

2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 119
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 120
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 121
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 122
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 123
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 124
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 125
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 126
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 127
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 128
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 129
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 130
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 131
2020.01.02 23:46:49 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 132
2020.01.02 23:46:49 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 133
(continues to send)

абонентская (серверная) консоль:

2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 119
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 120
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 121
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 122
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 123
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 124
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 125
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 126
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub ==body==> 127
(stops receiving on this message)

Пытался манипулировать свойствами "кредитов" в файле конфигурации Artemis: broker.xml... в частности "приемником" для AMQP - но безрезультатно:

<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP;autoAcknowledge=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpCredits=1000;amqpLowCredits=300</acceptor> 

Приложение для издателя:

package aaa.bbb.ccc.jar;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

@ApplicationScoped
public class CamelPub extends RouteBuilder {

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;
    static int x = 0;

    @Outgoing("data")
    public Publisher<String> source() {
        return crss.from("seda:thesource", String.class);
    }

    @Override
    public void configure() {

    onException(Exception.class)
        .log("--------------------(PUBLISHER: Exception!)-------------------->>> exception=" + this.exceptionMessage().toString());         

    crss = CamelReactiveStreams.get(ctx);
    from("timer://thetimer?period=100")
        .process(new Processor() {
            @Override
            public void process(Exchange msg) throws Exception {
                msg.getIn().setBody(Integer.toString(x++)); 
            }
        })
        .log("....... PUB ....... camelpub - body: ${body}")
        .to("seda:thesource");       
    }
}

src / main / resources / META-INF / microprofile-config.properties (издатель):

injected.value=Injected value
value=lookup value
server.port=8084
server.host=0.0.0.0

mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true
mp.messaging.outgoing.data.containerId=my-container-id
mp.messaging.outgoing.data.broadcast=true

Приложение для подписчиков:

package aaa.bbb.ccc.jar;

import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class CamelSub extends RouteBuilder {

    public CamelSub() throws Exception {
    }

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;

    @Incoming("data")
    public Subscriber<String> sink() {
        return crss.subscriber("seda:thesink", String.class);
    }

    @Override
    public void configure() {
    onException(Exception.class)
        .log("--------------------(SUBSCRIBER: Exception!)-------------------->>> exception=" + this.exceptionMessage().toString());
    crss = CamelReactiveStreams.get(ctx);
    from("seda:thesink")
        .log("ooooooo SUB ooooooo camelsub ==body==> ${body}");
    }
}

src / main / resources / META-INF / microprofile-config.properties (подписчик):

injected.value=Injected value
value=lookup value
server.port=8082
server.host=0.0.0.0

mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true
mp.messaging.incoming.data.containerId=my-container-id
mp.messaging.incoming.data.retry=true
mp.messaging.incoming.data.retry-attempts=5

pom.xml информация для pub и sub (по сути дублирует - pub показан ниже):

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc</groupId>
    <artifactId>camelpub</artifactId>
    <version>1.0</version>
    <properties>
        <helidonVersion>1.4.0</helidonVersion>
        <package>aaa.bbb.ccc.jar</package>
        <failOnMissingWebXml>false</failOnMissingWebXml>
        <mpVersion>3.2</mpVersion>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <libs.classpath.prefix>libs</libs.classpath.prefix>
        <mainClass>io.helidon.microprofile.server.Main</mainClass>
        <jersey.version>2.29</jersey.version>
        <copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
        <camelversion>3.0.0</camelversion>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile</groupId>
            <artifactId>microprofile</artifactId>
            <version>${mpVersion}</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>1.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>8.0</version>
            <type>jar</type>
        </dependency> 
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camelversion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-reactive-streams</artifactId>
            <version>${camelversion}</version>
        </dependency>       
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cdi</artifactId>
            <version>${camelversion}</version>
        </dependency>                              
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-provider</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-amqp</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0</version>
        </dependency>          
        <dependency>
            <groupId>io.helidon</groupId>
            <artifactId>helidon-bom</artifactId>
            <version>${helidonVersion}</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.jboss</groupId>
            <artifactId>jandex</artifactId>
            <version>2.1.1.Final</version>
            <scope>runtime</scope>
            <optional>true</optional>            
        </dependency>
        <dependency>
            <groupId>javax.activation</groupId>
            <artifactId>javax.activation-api</artifactId>
            <version>1.2.0</version>
            <scope>runtime</scope>            
        </dependency>
        <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-json-binding</artifactId>
            <version>${jersey.version}</version>
        </dependency>
        <dependency>
            <groupId>io.helidon.microprofile.bundles</groupId>
            <artifactId>helidon-microprofile-3.0</artifactId>
            <version>${helidonVersion}</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>camelpub</finalName>
        <plugins>                             
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
                            <mainClass>${mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.9</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${copied.libs.dir}</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <includeScope>runtime</includeScope>
                            <excludeScope>test</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Apache Artemis broker.xml:

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <persistence-enabled>true</persistence-enabled>
      <journal-type>NIO</journal-type>
      <paging-directory>data/paging</paging-directory>
      <bindings-directory>data/bindings</bindings-directory>
      <journal-directory>data/journal</journal-directory>
      <large-messages-directory>data/large-messages</large-messages-directory>
      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
      <journal-device-block-size>4096</journal-device-block-size>
      <journal-file-size>10M</journal-file-size>      
      <journal-buffer-timeout>1316000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>
      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>
      <critical-analyzer>true</critical-analyzer>
      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
      <critical-analyzer-policy>HALT</critical-analyzer-policy>

    <!--
      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
      </acceptors>  
    -->

      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;consumerWindowSize=-1;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=100;amqpLowCredits=20</acceptor>
      </acceptors>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>
      </addresses>

      <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events -->
      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>

   </core>
</configuration>

Используемые технологии:

  • Java 11
  • Apache Camel 3.0
  • API реактивного обмена сообщениями Smallrye
  • Брокер Apache Artemis 2.10.1

Заметки:

  • Мы используем отдельные серверы Helidon для приложения издателя и серверного приложения соответственно.
  • Та же проблема возникает при использовании брокера Qpid.

0 ответов

Другие вопросы по тегам