gpt4 book ai didi

java - 使用 Apache Camel/Smallrye/reactive 流 - 如何跨 JVM 将 "publisher"连接到 "subscriber"?

转载 作者:行者123 更新时间:2023-12-02 03:24:40 24 4
gpt4 key购买 nike

下面是我尝试使用 Apache Camel 响应式(Reactive)流解决方案,跨 JVM 将发布者连接到订阅者(Camel 路由的代码如下所示)

为了实现跨 JVM 的通信,似乎需要一个“代理”服务器。因此,我实现了 Artemis 代理并相应地修改了 application.properties 文件(根据我对如何执行此操作的最佳理解)。

此外,为了缩小焦点,我们选择使用smallrye-ampq 连接器。

问题:

订阅者应该接收并记录字符串值(来自正文):

-
-
-
:blahblahblah
:blahblahblah
:blahblahblah
-
-
-

--相反,它正在记录值,如下所示:

-
-
-
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
-
-
-

问题:

为什么发布者发送的有效负载无法到达订阅者?我可以修改哪些代码/配置来修复它?

提前感谢您的帮助!

---

“发布商”路线

package aaa.bbb.ccc.jar;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

@ApplicationScoped
public class CamelPub extends RouteBuilder {

@Inject
CamelContext ctx;

CamelReactiveStreamsService crss;
static int x = 0;

@Outgoing("data")
public Publisher<Exchange> source() {
return crss.from("seda:thesource");
}

@Override
public void configure() {

crss = CamelReactiveStreams.get(ctx);
from("timer://thetimer?period=1000")
.process(new Processor() {
@Override
public void process(Exchange msg) throws Exception {
msg.getIn().setBody("blahblahblah"); //(Integer.toString(x++));
}
})
.log("....... PUB ....... camelpub - body: ${body}")
.to("direct:thesource");
}
}

microprofile-config.properties - 发布者

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8084
server.host=0.0.0.0

mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true

相关控制台日志摘录(?) - 发布者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelpub ---
2019.12.17 22:26:34 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:26:35 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:26:35 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding: [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:26:36 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:26:36 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:26:36 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:26:37 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesource is using shared queue: seda://thesource with size: 1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: timer://thetimer?period=1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.191 seconds
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelPub] with qualifiers [@Any @Default]
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelPub
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: seda://thesource
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting method aaa.bbb.ccc.jar.CamelPub#source to sink data
2019.12.17 22:26:37 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container e71e38c0-91ec-4758-a310-55f1368c6a9c initialized
2019.12.17 22:26:37 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:26:37 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:26:37 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@57fbc06f
2019.12.17 22:26:38 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0x52928b67, L:/0:0:0:0:0:0:0:0:8084]
2019.12.17 22:26:38 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8084 (and all other host addresses) in 3668 milliseconds.
2019.12.17 22:26:39 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 1
2019.12.17 22:26:40 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 2
2019.12.17 22:26:41 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 3
2019.12.17 22:26:42 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 4
...

“订阅者”路线

package aaa.bbb.ccc.jar;

import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class CamelSub extends RouteBuilder {

public CamelSub() throws Exception {
}

@Inject
CamelContext ctx;

CamelReactiveStreamsService crss;

@Incoming("data")
public Subscriber<String> sink() {
return crss.subscriber("seda:thesink", String.class);
}

@Override
public void configure() {

crss = CamelReactiveStreams.get(ctx);

from("seda:thesink")
.convertBodyTo(String.class)
.log("ooooooo SUB ooooooo camelsub - body: ${body}");
}
}

microprofile-config.properties - 订阅者

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8082
server.host=0.0.0.0

mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true

相关控制台日志摘录(?) - 订阅者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelsub ---
2019.12.17 22:28:09 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:28:10 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding: [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:28:11 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:28:11 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:28:11 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesink is using shared queue: seda://thesink with size: 1000
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: seda://thesink
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.173 seconds
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelSub] with qualifiers [@Any @Default]
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelSub
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:28:12 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: reactive-streams://ID-LAPTOP-4LR4PMVQ-1576639692145-0-1
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Attempt to resolve aaa.bbb.ccc.jar.CamelSub#sink
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting aaa.bbb.ccc.jar.CamelSub#sink to `data` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@3eda0aeb)
2019.12.17 22:28:12 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container c1eaa1fb-486c-4b95-b56b-0f1a7b88f741 initialized
2019.12.17 22:28:12 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:28:12 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:28:12 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@77f905e3
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0xd8f72801, L:/0:0:0:0:0:0:0:0:8082]
2019.12.17 22:28:12 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8082 (and all other host addresses) in 3310 milliseconds.
2019.12.17 22:28:13 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
2019.12.17 22:28:14 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
2019.12.17 22:28:15 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
...

注意:上面的输出应该显示数字...而不是例如“Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]”等... :-(

每个的 maven pom.xml 本质上是相同的

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>[NOTE: essentially same pom.xml for both camelpub or camelsub]</artifactId>
<version>1.0</version>
<properties>
<helidonVersion>1.4.0</helidonVersion>
<package>aaa.bbb.ccc.jar</package>
<failOnMissingWebXml>false</failOnMissingWebXml>
<mpVersion>3.2</mpVersion>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<libs.classpath.prefix>libs</libs.classpath.prefix>
<mainClass>io.helidon.microprofile.server.Main</mainClass>
<jersey.version>2.29</jersey.version>
<copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
<camelversion>3.0.0</camelversion>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.microprofile</groupId>
<artifactId>microprofile</artifactId>
<version>${mpVersion}</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>1.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>8.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-reactive-streams</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cdi</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>io.helidon</groupId>
<artifactId>helidon-bom</artifactId>
<version>${helidonVersion}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jandex</artifactId>
<version>2.1.1.Final</version>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-binding</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>helidon-microprofile-3.0</artifactId>
<version>${helidonVersion}</version>
</dependency>
</dependencies>
<build>
<finalName>[NOTE: essentially same pom.xml for both camelpub or camelsub]</finalName>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${copied.libs.dir}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeScope>test</excludeScope>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

docker-compose.yml(Artemis)

# A docker compose file to start an Artemis AMQP broker
# more details on https://github.com/vromero/activemq-artemis-docker.
version: '2'

services:

artemis:
image: vromero/activemq-artemis:2.8.0-alpine
ports:
- "8161:8161"
- "61616:61616"
- "5672:5672"
environment:
ARTEMIS_USERNAME: artuser
ARTEMIS_PASSWORD: artpassword

使用的技术

java 8
apache camel
smallrye
artemis
reactive streams/programming

(正在使用此链接作为资源:https://smallrye.io/smallrye-reactive-messaging/)

最佳答案

您在帖子中提到的问题是一个相当常见的用例,具有一些定义明确的模式来解决问题,在这种情况下,这将合理地涉及设置某种异步消息传递中间件,例如 Apache ActiveMQ , RabbitMQ , Apache Kafka等等。这样做可以为您提供一种完美的方式来解耦您的 Camel 上下文,如文章 Why Use Multiple Camel Contexts? 中所述。 Apache Camel 进一步解释了这个概念。 Message Channel EIP 的文档(EIP = Enterprise Integration Pattern)。

我在上面的帖子中看到您似乎正在尝试使用 Camel SEDA 。其文档页面指出:

Note that queues are only visible within a single CamelContext. If you want to communicate across CamelContext instances (for example, communicating between Web applications), see the VM component.

This component does not implement any kind of persistence or recovery, if the VM terminates while messages are yet to be processed. If you need persistence, reliability or distributed SEDA, try using either JMS or ActiveMQ.

Camel VM component在这里也不适合你,因为你的多个 Camel 上下文分布在不同的服务器上。 VM组件可以在多个Camel上下文之间运行,但它们必须都在同一个JVM中运行才能相互通信。

出于这些原因,在这种情况下,我看不到任何使用某种消息传递中间件的方法。

既然你提到了流媒体,比如 Apache Kafka可能是一个不错的选择。我以前没有处理过这个问题,也无法对此进行进一步评论,但我发现了一篇文章,其中一个家伙谈论了它(请参阅 Reactive Streams for Apache Kafka )。 Camel 有一个 Kafka Component可以用来将所有东西连接在一起。

关于java - 使用 Apache Camel/Smallrye/reactive 流 - 如何跨 JVM 将 "publisher"连接到 "subscriber"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59185396/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com