Использование Apache Camel/Smallrye/ реактивных потоков - как я могу подключить "издателя" к "подписчику" через JVM?
Ниже представлена моя попытка решения реактивных потоков Apache Camel для подключения издателя к подписчику (код для маршрутов верблюда показан ниже) через JVM
Кажется, что для обеспечения связи между JVM требуется сервер-брокер. Поэтому я реализовал брокер Artemis и соответствующим образом изменил файлы application.properties (насколько я понимаю, как это сделать).
Кроме того, чтобы сузить фокус, мы решили использовать разъем smallrye-ampq.
Проблема:
Подписчик должен получать и регистрировать значение String (из тела):
-
-
-
:blahblahblah
:blahblahblah
:blahblahblah
-
-
-
- Вместо этого он регистрирует значения, например:
-
-
-
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
-
-
-
Вопрос:
почему данные, отправленные издателем, не достигают подписчика и какой код / конфигурацию я могу изменить, чтобы исправить это?
спасибо заранее за любую помощь!
---
Маршрут "Издатель"
package aaa.bbb.ccc.jar;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@ApplicationScoped
public class CamelPub extends RouteBuilder {
@Inject
CamelContext ctx;
CamelReactiveStreamsService crss;
static int x = 0;
@Outgoing("data")
public Publisher<Exchange> source() {
return crss.from("seda:thesource");
}
@Override
public void configure() {
crss = CamelReactiveStreams.get(ctx);
from("timer://thetimer?period=1000")
.process(new Processor() {
@Override
public void process(Exchange msg) throws Exception {
msg.getIn().setBody("blahblahblah"); //(Integer.toString(x++));
}
})
.log("....... PUB ....... camelpub - body: ${body}")
.to("direct:thesource");
}
}
microprofile-config.properties - издатель
injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8084
server.host=0.0.0.0
mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true
соответствующий отрывок из журнала консоли (?) - издатель
...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelpub ---
2019.12.17 22:26:34 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:26:35 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:26:35 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding: [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:26:36 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:26:36 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:26:36 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:26:37 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesource is using shared queue: seda://thesource with size: 1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: timer://thetimer?period=1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.191 seconds
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelPub] with qualifiers [@Any @Default]
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelPub
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: seda://thesource
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting method aaa.bbb.ccc.jar.CamelPub#source to sink data
2019.12.17 22:26:37 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container e71e38c0-91ec-4758-a310-55f1368c6a9c initialized
2019.12.17 22:26:37 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:26:37 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:26:37 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@57fbc06f
2019.12.17 22:26:38 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0x52928b67, L:/0:0:0:0:0:0:0:0:8084]
2019.12.17 22:26:38 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8084 (and all other host addresses) in 3668 milliseconds.
2019.12.17 22:26:39 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 1
2019.12.17 22:26:40 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 2
2019.12.17 22:26:41 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 3
2019.12.17 22:26:42 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 4
...
маршрут "Абонент"
package aaa.bbb.ccc.jar;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;
@ApplicationScoped
public class CamelSub extends RouteBuilder {
public CamelSub() throws Exception {
}
@Inject
CamelContext ctx;
CamelReactiveStreamsService crss;
@Incoming("data")
public Subscriber<String> sink() {
return crss.subscriber("seda:thesink", String.class);
}
@Override
public void configure() {
crss = CamelReactiveStreams.get(ctx);
from("seda:thesink")
.convertBodyTo(String.class)
.log("ooooooo SUB ooooooo camelsub - body: ${body}");
}
}
microprofile-config.properties - подписчик
injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8082
server.host=0.0.0.0
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true
соответствующий отрывок из журнала консоли (?) - подписчик
...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelsub ---
2019.12.17 22:28:09 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:28:10 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding: [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:28:11 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:28:11 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:28:11 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesink is using shared queue: seda://thesink with size: 1000
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: seda://thesink
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.173 seconds
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelSub] with qualifiers [@Any @Default]
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelSub
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:28:12 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: reactive-streams://ID-LAPTOP-4LR4PMVQ-1576639692145-0-1
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Attempt to resolve aaa.bbb.ccc.jar.CamelSub#sink
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting aaa.bbb.ccc.jar.CamelSub#sink to `data` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@3eda0aeb)
2019.12.17 22:28:12 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container c1eaa1fb-486c-4b95-b56b-0f1a7b88f741 initialized
2019.12.17 22:28:12 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:28:12 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:28:12 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@77f905e3
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0xd8f72801, L:/0:0:0:0:0:0:0:0:8082]
2019.12.17 22:28:12 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8082 (and all other host addresses) in 3310 milliseconds.
2019.12.17 22:28:13 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
2019.12.17 22:28:14 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
2019.12.17 22:28:15 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
...
ПРИМЕЧАНИЕ. Приведенный выше вывод должен отображать числа... вместо, например, "Обмен [ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]" и т. Д.:-(
по сути тот же maven pom.xml для каждого
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>[NOTE: essentially same pom.xml for both camelpub or camelsub]</artifactId>
<version>1.0</version>
<properties>
<helidonVersion>1.4.0</helidonVersion>
<package>aaa.bbb.ccc.jar</package>
<failOnMissingWebXml>false</failOnMissingWebXml>
<mpVersion>3.2</mpVersion>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<libs.classpath.prefix>libs</libs.classpath.prefix>
<mainClass>io.helidon.microprofile.server.Main</mainClass>
<jersey.version>2.29</jersey.version>
<copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
<camelversion>3.0.0</camelversion>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.microprofile</groupId>
<artifactId>microprofile</artifactId>
<version>${mpVersion}</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>1.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>8.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-reactive-streams</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cdi</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>io.helidon</groupId>
<artifactId>helidon-bom</artifactId>
<version>${helidonVersion}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jandex</artifactId>
<version>2.1.1.Final</version>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-binding</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>helidon-microprofile-3.0</artifactId>
<version>${helidonVersion}</version>
</dependency>
</dependencies>
<build>
<finalName>[NOTE: essentially same pom.xml for both camelpub or camelsub]</finalName>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${copied.libs.dir}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeScope>test</excludeScope>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
docker-compose.yml (Артемида)
# A docker compose file to start an Artemis AMQP broker
# more details on https://github.com/vromero/activemq-artemis-docker.
version: '2'
services:
artemis:
image: vromero/activemq-artemis:2.8.0-alpine
ports:
- "8161:8161"
- "61616:61616"
- "5672:5672"
environment:
ARTEMIS_USERNAME: artuser
ARTEMIS_PASSWORD: artpassword
используемые технологии
java 8
apache camel
smallrye
artemis
reactive streams/programming
(использовал эту ссылку в качестве ресурса: https://smallrye.io/smallrye-reactive-messaging/)
3 ответа
Итак, получается, что изменение сигнатуры метода с...
@Outgoing("data")
public Publisher<Exchange> source() {
...
}
чтобы...
@Outgoing("data")
public Publisher<String> source() {
...
}
устраняет проблему, так что теперь подписчик получает и регистрирует значение / полезную нагрузку, отправленную издателем
Проблема, которую вы указали в своем сообщении, является довольно распространенным вариантом использования с некоторыми четко определенными шаблонами для решения проблемы, что в этом случае разумно потребует настройки какого-либо промежуточного программного обеспечения для асинхронного обмена сообщениями, такого как Apache ActiveMQ, RabbitMQ, Apache Kafka, и т. д. Это дает вам отличный способ отделить контексты Camel, как упоминалось в статье Зачем использовать несколько контекстов Camel? Эта концепция дополнительно объясняется в документации Apache Camel для EIP канала сообщений (EIP = Enterprise Integration Pattern).
Я вижу в вашем сообщении выше, что вы, похоже, пытаетесь использовать Camel SEDA. На его странице документации указано:
Обратите внимание, что очереди видны только в пределах одного CamelContext. Если вы хотите взаимодействовать между экземплярами CamelContext (например, взаимодействовать между веб-приложениями), см. Компонент виртуальной машины.
Этот компонент не реализует никакого вида сохранения или восстановления, если виртуальная машина завершает работу, пока сообщения еще не обработаны. Если вам нужна настойчивость, надежность или распределенная SEDA, попробуйте использовать JMS или ActiveMQ.
Компонент Camel VM здесь не подойдет вам, так как ваши многочисленные контексты Camel распределены по разным серверам. Компонент виртуальной машины может работать между несколькими контекстами Camel, но все они должны выполняться в одной JVM для взаимодействия.
По этим причинам я не вижу возможности использовать какое-то промежуточное ПО для обмена сообщениями в данном случае.
Поскольку вы упомянули потоковую передачу, хорошим выбором может быть что-то вроде Apache Kafka. Я не работал с этим раньше и не мог больше комментировать это, но я нашел статью, в которой один парень говорит об этом (см. Реактивные потоки для Apache Kafka). У Camel есть компонент Kafka, который можно использовать, чтобы связать все вместе.
Не общий простой ответ верблюда, но RSocket реализует модель программирования RX по сети (поверх сокетов TCP, веб-сокетов HTTP и т. Д.).
https://github.com/rsocket/rsocket-java
Он хорошо поддерживается фреймворками приложений, такими как Spring Boot. Но это не тот простой пример работы, о котором вы просите.