使用 Apache Camel/Smallrye/reactive 流 - 如何跨 JVM 将 "publisher" 连接到 "subscriber"?

Using Apache Camel/Smallrye/reactive streams - how can I connect a "publisher" to a "subscriber" across JVMs?

下面是我尝试使用 Apache Camel 反应流解决方案跨 JVM

将发布者连接到订阅者(camel 路由的代码如下所示)

要实现跨 JVM 的通信,似乎需要 "broker" 服务器。因此,我实施了 Artemis 代理并相应地修改了 application.properties 文件(根据我对如何操作的最佳理解)。

此外,为了缩小焦点,选择使用 smallrye-ampq 连接器。

问题:

订户应该接收并记录字符串值(来自正文):

-
-
-
:blahblahblah
:blahblahblah
:blahblahblah
-
-
-

--相反,它正在记录值,如下所示:

-
-
-
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
-
-
-

问题:

为什么发布者发送的负载没有到达订阅者,我可以code/configuration修改什么来修复它?

提前感谢任何帮助!

---

"Publisher"路线

package aaa.bbb.ccc.jar;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

@ApplicationScoped
public class CamelPub extends RouteBuilder {

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;
    static int x = 0;

    @Outgoing("data")
    public Publisher<Exchange> source() {
        return crss.from("seda:thesource");
    }

    @Override
    public void configure() {

        crss = CamelReactiveStreams.get(ctx);
        from("timer://thetimer?period=1000")
                .process(new Processor() {
                    @Override
                    public void process(Exchange msg) throws Exception {
                        msg.getIn().setBody("blahblahblah"); //(Integer.toString(x++));
                    }
                })
                .log("....... PUB ....... camelpub - body: ${body}")
                .to("direct:thesource");
    }
}

microprofile-config.properties - 出版商

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8084
server.host=0.0.0.0

mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true

相关控制台日志摘录(?) - 发布者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelpub ---
2019.12.17 22:26:34 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:26:35 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:26:35 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:26:36 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:26:36 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:26:36 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:26:37 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesource is using shared queue: seda://thesource with size: 1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: timer://thetimer?period=1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.191 seconds
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelPub] with qualifiers [@Any @Default]
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelPub
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: seda://thesource
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting method aaa.bbb.ccc.jar.CamelPub#source to sink data
2019.12.17 22:26:37 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container e71e38c0-91ec-4758-a310-55f1368c6a9c initialized
2019.12.17 22:26:37 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:26:37 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:26:37 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@57fbc06f
2019.12.17 22:26:38 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0x52928b67, L:/0:0:0:0:0:0:0:0:8084]
2019.12.17 22:26:38 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8084 (and all other host addresses) in 3668 milliseconds.
2019.12.17 22:26:39 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 1
2019.12.17 22:26:40 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 2
2019.12.17 22:26:41 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 3
2019.12.17 22:26:42 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 4
...

"Subscriber"路线

package aaa.bbb.ccc.jar;

import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class CamelSub extends RouteBuilder {

    public CamelSub() throws Exception {
    }

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;

    @Incoming("data")
    public Subscriber<String> sink() {
    return crss.subscriber("seda:thesink", String.class);
    }    

    @Override
    public void configure() {

    crss = CamelReactiveStreams.get(ctx);

    from("seda:thesink")
        .convertBodyTo(String.class)
        .log("ooooooo SUB ooooooo camelsub - body: ${body}");
    }
}

microprofile-config.properties - 订阅者

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8082
server.host=0.0.0.0

mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true

相关控制台日志摘录(?) - 订阅者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelsub ---
2019.12.17 22:28:09 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:28:10 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:28:11 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:28:11 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:28:11 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesink is using shared queue: seda://thesink with size: 1000
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: seda://thesink
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.173 seconds
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelSub] with qualifiers [@Any @Default]
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelSub
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:28:12 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: reactive-streams://ID-LAPTOP-4LR4PMVQ-1576639692145-0-1
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Attempt to resolve aaa.bbb.ccc.jar.CamelSub#sink
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting aaa.bbb.ccc.jar.CamelSub#sink to `data` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@3eda0aeb)
2019.12.17 22:28:12 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container c1eaa1fb-486c-4b95-b56b-0f1a7b88f741 initialized
2019.12.17 22:28:12 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:28:12 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:28:12 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@77f905e3
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0xd8f72801, L:/0:0:0:0:0:0:0:0:8082]
2019.12.17 22:28:12 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8082 (and all other host addresses) in 3310 milliseconds.
2019.12.17 22:28:13 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
2019.12.17 22:28:14 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
2019.12.17 22:28:15 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]    
...

注意:上面的输出应该显示数字...而不是 "Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]" 等...:-(

基本上相同的 maven pom.xml 每个

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc</groupId>
    <artifactId>[NOTE: essentially same pom.xml for both camelpub or camelsub]</artifactId>
    <version>1.0</version>
    <properties>
        <helidonVersion>1.4.0</helidonVersion>
        <package>aaa.bbb.ccc.jar</package>
        <failOnMissingWebXml>false</failOnMissingWebXml>
        <mpVersion>3.2</mpVersion>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <libs.classpath.prefix>libs</libs.classpath.prefix>
        <mainClass>io.helidon.microprofile.server.Main</mainClass>
        <jersey.version>2.29</jersey.version>
        <copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
        <camelversion>3.0.0</camelversion>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile</groupId>
            <artifactId>microprofile</artifactId>
            <version>${mpVersion}</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>1.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>8.0</version>
            <type>jar</type>
        </dependency> 
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camelversion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-reactive-streams</artifactId>
            <version>${camelversion}</version>
        </dependency>       
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cdi</artifactId>
            <version>${camelversion}</version>
        </dependency>                              
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-provider</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-amqp</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0</version>
        </dependency>          
        <dependency>
            <groupId>io.helidon</groupId>
            <artifactId>helidon-bom</artifactId>
            <version>${helidonVersion}</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.jboss</groupId>
            <artifactId>jandex</artifactId>
            <version>2.1.1.Final</version>
            <scope>runtime</scope>
            <optional>true</optional>            
        </dependency>
        <dependency>
            <groupId>javax.activation</groupId>
            <artifactId>javax.activation-api</artifactId>
            <version>1.2.0</version>
            <scope>runtime</scope>            
        </dependency>
        <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-json-binding</artifactId>
            <version>${jersey.version}</version>
        </dependency>
        <dependency>
            <groupId>io.helidon.microprofile.bundles</groupId>
            <artifactId>helidon-microprofile-3.0</artifactId>
            <version>${helidonVersion}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>[NOTE: essentially same pom.xml for both camelpub or camelsub]</finalName>
        <plugins>                             
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
                            <mainClass>${mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.9</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${copied.libs.dir}</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <includeScope>runtime</includeScope>
                            <excludeScope>test</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

docker-compose.yml (阿尔忒弥斯)

# A docker compose file to start an Artemis AMQP broker
# more details on https://github.com/vromero/activemq-artemis-docker.
version: '2'

services:

  artemis:
    image: vromero/activemq-artemis:2.8.0-alpine
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    environment:
      ARTEMIS_USERNAME: artuser
      ARTEMIS_PASSWORD: artpassword

使用的技术

java 8
apache camel
smallrye
artemis
reactive streams/programming

(正在使用此 link 作为资源:https://smallrye.io/smallrye-reactive-messaging/

您在 post 中陈述的问题是一个相当常见的用例,具有一些定义明确的模式来解决问题,在这种情况下,合理地涉及设置某种异步消息传递中间件,例如如 Apache ActiveMQ, RabbitMQ, Apache Kafka, etc. Doing so gives you a perfect way to decouple your Camel contexts as mentioned in the article Why Use Multiple Camel Contexts? This concept is further explained in the Apache Camel documentation for the Message Channel EIP (EIP = Enterprise Integration Pattern).

我在上面的 post 中看到您似乎在尝试使用 Camel SEDA。其文档页面指出:

Note that queues are only visible within a single CamelContext. If you want to communicate across CamelContext instances (for example, communicating between Web applications), see the VM component.

This component does not implement any kind of persistence or recovery, if the VM terminates while messages are yet to be processed. If you need persistence, reliability or distributed SEDA, try using either JMS or ActiveMQ.

Camel VM component 在这里也不适合你,因为你的多个 Camel 上下文分布在不同的服务器上。 VM 组件可以 运行 在多个 Camel 上下文之间,但它们必须全部 运行 在同一个 JVM 中才能相互通信。

出于这些原因,在这种情况下,我看不到使用某种消息传递中间件的任何方法。

既然你提到了流式传输,Apache Kafka may be a good choice. I've not worked with this before and couldn't comment much further on it, but I found an article where a fella talks about it (see Reactive Streams for Apache Kafka). Camel has a Kafka Component 之类的东西可以用来将所有东西连接在一起。

不是通用的简单骆驼式答案,而是 RSocket 跨网络实现 RX 编程模型(在 TCP 套接字、HTTP Websocket 等之上)。

https://github.com/rsocket/rsocket-java

它得到 Spring Boot 等应用程序框架的良好支持。但这不是您要求的简单有效示例。

所以,结果是将方法签名从...

@Outgoing("data")
public Publisher<Exchange> source() {
    ...
}

到...

@Outgoing("data")
public Publisher<String> source() {
    ...
}

修复了问题,订阅者现在可以接收并记录发布者

发送的 value/payload