我的 Camel 路由应用程序检测到其他应用程序发布了 amqp 消息(?),但是无法处理,出现 "no type converter available" 错误。我该如何解决?

My Camel route app detects other app's published ampq message(?), but, fails to handle, with "no type converter available" error. How can I resolve?

我的 Camel 路由应用程序检测到其他应用程序发布的 ampq 消息(发布数字),但是无法处理,出现 "no type converter available" 错误。我该如何解决?

"org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79"

路线建设者class

package aaa.bbb.ccc.qscx;

import java.io.IOException;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@Startup
@ApplicationScoped
public class TheRoutes extends RouteBuilder {

    @Inject
    TheProcessor theProcessor;

    @Inject
    CamelContext ctx;

    @Inject
    CamelReactiveStreamsService crss;

    @Override
    public void configure() throws IOException, InterruptedException {
        from("reactive-streams:in")
                .process(theProcessor)
                .log(".........from reactive-streams:in - body:  ${body}");
    }

    @Incoming("prices")
    public Subscriber<String> sink() {
        return crss.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
    }
}   

pom.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc </groupId>
    <artifactId>qscx</artifactId>
    <version>1.0</version>
    <properties>
        <compiler-plugin.version>3.8.1</compiler-plugin.version>
        <maven.compiler.parameters>true</maven.compiler.parameters>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <quarkus-plugin.version>1.0.0.CR2</quarkus-plugin.version>
        <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
        <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
        <quarkus.platform.version>1.0.0.CR2</quarkus.platform.version>
        <surefire-plugin.version>2.22.1</surefire-plugin.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>${quarkus.platform.group-id}</groupId>
                <artifactId>${quarkus.platform.artifact-id}</artifactId>
                <version>${quarkus.platform.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5</artifactId>
            <scope>test</scope>
        </dependency>  
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-resteasy</artifactId>
        </dependency>
        <dependency>
            <groupId>io.rest-assured</groupId>
            <artifactId>rest-assured</artifactId>
            <scope>test</scope>
        </dependency>                 
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging</artifactId>
        </dependency>                
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-timer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-artemis-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-bean</artifactId>
        </dependency>    
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-support-common</artifactId>
        </dependency>   
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-camel</artifactId>
            <version>1.0.8</version>
        </dependency>        
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.26</version>
        </dependency>                                                                                         
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>io.quarkus</groupId>
                <artifactId>quarkus-maven-plugin</artifactId>
                <version>${quarkus-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>build</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${compiler-plugin.version}</version>
            </plugin>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${surefire-plugin.version}</version>
                <configuration>
                    <systemProperties>
                        <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
                    </systemProperties>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>native</id>
            <activation>
                <property>
                    <name>native</name>
                </property>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <artifactId>maven-failsafe-plugin</artifactId>
                        <version>${surefire-plugin.version}</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>integration-test</goal>
                                    <goal>verify</goal>
                                </goals>
                                <configuration>
                                    <systemProperties>
                                        <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
                                    </systemProperties>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
            <properties>
                <quarkus.package.type>native</quarkus.package.type>
            </properties>
        </profile>
    </profiles>
    <name>qscx</name>
</project>

application.properties

amqp-username=quarkus
amqp-password=quarkus
mp.messaging.incoming.prices.address=prices
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.host=localhost
mp.messaging.incoming.prices.port=5672
mp.messaging.incoming.prices.username=quarkus
mp.messaging.incoming.prices.password=quarkus
mp.messaging.incoming.prices.broadcast=true
mp.messaging.incoming.prices.containerId=my-container-id

控制台堆栈跟踪(摘录)

2019-11-22 22:31:22,930 WARN  [org.apa.cam.com.rea.str.ReactiveStreamsConsumer] (Camel (camel-1) thread #1 - reactive-streams://DCE85ACAC992C3A-0000000000000000) Error processing exchange. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot store file: .\target\values.txt]: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: .\target\values.txt
        at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:376)
        at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:300)
        at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:164)
        at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:75)
        at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:67)
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134)
        at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
        at org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer.lambda$doSend(ReactiveStreamsConsumer.java:96)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: 79 of type: java.lang.Integer on: Message[]. Caused by: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79]
        at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:115)
        at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:355)
        ... 14 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:139)
        at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:113)
        ... 15 more

其他备注

发布应用基于 Quark Ampq 示例:
https://github.com/quarkusio/quarkus-quickstarts/tree/master/amqp-quickstart/src/main/java/org/acme/quarkus/sample

技术

java 8

夸克斯

小黑麦

驼色

行家

看来subscriber目前没有执行类型转换。可能会在以后的版本中解决。

同时,你需要在路由中强制执行:

@Override
public void configure() throws IOException, InterruptedException {
    from("direct:proc")
        .convertBodyTo(String.class)
        .to("file:./target?fileName=values.txt&fileExist=append");
}

@Incoming("prices")
public Subscriber<String> sink() {
    return crss.subscriber("direct:proc", String.class);
}