如何防止 kafka 到 mqtt Apache Camel 路由失败?
How to prevent kafka to mqtt Apache Camel route from failures?
我有一个将 Kafka 流数据分发到 MQTT 主题的系统。 Apache Camel 路由如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="KafkaToMQTT">
<from uri="kafka://mqtt?brokers=localhost:9092" />
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start" />
<toD uri="mqtt:mqtt?host=tcp://localhost:1883&publishTopicName=${header.kafka.KEY}" />
<to uri="log://camel.proxy?groupInterval=100&level=INFO" />
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=stop" />
</route>
</routes>
并且由于数据流开始,对于 10 个主题中的 7 个主题一切正常,但是对于应该属于其他 3 个主题的消息(不同 运行s 之间不同,但在同一骆驼中相同route 运行) 我收到如下堆栈跟踪的错误消息:
19:19:34.195 [Camel (camel-1) thread #4 - KafkaConsumer[mqtt]] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-camel-proxy-586888f9b6-dfnwp-1652296571794-0-14406 on ExchangeId: ID-camel-proxy-586888f9b6-dfnwp-1652296571794-0-14404). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Already connected
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[KafkaToMQTT ] [KafkaToMQTT ] [kafka://mqtt?brokers=kafka%3A9092&groupId=mqtt ] [ 0]
[KafkaToMQTT ] [to85 ] [micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start ] [ 0]
[KafkaToMQTT ] [toD1 ] [ ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: Already connected
at org.fusesource.mqtt.client.CallbackConnection.connect(CallbackConnection.java:135)
at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:315)
at org.apache.camel.component.mqtt.MQTTProducer.ensureConnected(MQTTProducer.java:108)
at org.apache.camel.component.mqtt.MQTTProducer.process(MQTTProducer.java:39)
at org.apache.camel.processor.SendDynamicProcessor.doInAsyncProducer(SendDynamicProcessor.java:178)
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:445)
at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:160)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我不清楚是什么交互中断了,如果只是资源瓶颈,比如连接池不足之类的,可以配置什么来增加资源?
由于动态性质,这看起来像是 <toD/>
相关的问题。使用 pre-defined 值检查进行显式切换使一切正常:
<routes
xmlns="http://camel.apache.org/schema/spring">
<route id="KafkaToMQTT">
<from uri="kafka://mqtt?brokers=kafka:9092"/>
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start"/>
<when>
<simple>${header.kafka.KEY} == "ExpectedValue1"</simple>
<to uri="mqtt:mqtt?host=tcp://mqtt:1883&publishTopicName=ExpectedValue1"/>
</when>
<when>
<simple>${header.kafka.KEY} == "ExpectedValue2"</simple>
<to uri="mqtt:mqtt?host=tcp://mqtt:1883&publishTopicName=ExpectedValue2"/>
</when>
<to uri="log://camel.proxy?groupInterval=100&level=INFO"/>
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=stop"/>
</route>
</routes>
这不是很普遍,但对我的 goal.s
有用
我有一个将 Kafka 流数据分发到 MQTT 主题的系统。 Apache Camel 路由如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="KafkaToMQTT">
<from uri="kafka://mqtt?brokers=localhost:9092" />
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start" />
<toD uri="mqtt:mqtt?host=tcp://localhost:1883&publishTopicName=${header.kafka.KEY}" />
<to uri="log://camel.proxy?groupInterval=100&level=INFO" />
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=stop" />
</route>
</routes>
并且由于数据流开始,对于 10 个主题中的 7 个主题一切正常,但是对于应该属于其他 3 个主题的消息(不同 运行s 之间不同,但在同一骆驼中相同route 运行) 我收到如下堆栈跟踪的错误消息:
19:19:34.195 [Camel (camel-1) thread #4 - KafkaConsumer[mqtt]] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-camel-proxy-586888f9b6-dfnwp-1652296571794-0-14406 on ExchangeId: ID-camel-proxy-586888f9b6-dfnwp-1652296571794-0-14404). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Already connected
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[KafkaToMQTT ] [KafkaToMQTT ] [kafka://mqtt?brokers=kafka%3A9092&groupId=mqtt ] [ 0]
[KafkaToMQTT ] [to85 ] [micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start ] [ 0]
[KafkaToMQTT ] [toD1 ] [ ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: Already connected
at org.fusesource.mqtt.client.CallbackConnection.connect(CallbackConnection.java:135)
at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:315)
at org.apache.camel.component.mqtt.MQTTProducer.ensureConnected(MQTTProducer.java:108)
at org.apache.camel.component.mqtt.MQTTProducer.process(MQTTProducer.java:39)
at org.apache.camel.processor.SendDynamicProcessor.doInAsyncProducer(SendDynamicProcessor.java:178)
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:445)
at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:160)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我不清楚是什么交互中断了,如果只是资源瓶颈,比如连接池不足之类的,可以配置什么来增加资源?
由于动态性质,这看起来像是 <toD/>
相关的问题。使用 pre-defined 值检查进行显式切换使一切正常:
<routes
xmlns="http://camel.apache.org/schema/spring">
<route id="KafkaToMQTT">
<from uri="kafka://mqtt?brokers=kafka:9092"/>
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=start"/>
<when>
<simple>${header.kafka.KEY} == "ExpectedValue1"</simple>
<to uri="mqtt:mqtt?host=tcp://mqtt:1883&publishTopicName=ExpectedValue1"/>
</when>
<when>
<simple>${header.kafka.KEY} == "ExpectedValue2"</simple>
<to uri="mqtt:mqtt?host=tcp://mqtt:1883&publishTopicName=ExpectedValue2"/>
</when>
<to uri="log://camel.proxy?groupInterval=100&level=INFO"/>
<to uri="micrometer:timer:camel.proxy.kafka.mqtt.stream?action=stop"/>
</route>
</routes>
这不是很普遍,但对我的 goal.s
有用