如何使用 spring 从 kafka 入站消息中提取数据
How to extract the data from a kafka inbound message with spring
我正在使用 spring 构建 Kafka 消费者。我的配置似乎非常简单。消息正在被消费并保存在文件中。但是,有效载荷是神秘的,我无法获取数据(短 "hello world" 消息)。
这是我访问有效载荷时得到的(例如,当我在入站 kafka 和出站文件之间设置一个转换器 bean 时:
{test-topic={0=[[B@713c9d72, [B@7d656f90, [B@26bb8c83, [B@4b959d83 [B@5ed74e8e]}}
我的问题是:如何访问实际负载("hellow world" 字符串")?
我的配置是:
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<int:poller
max-messages-per-poll="5" default = "true" fixed-delay="10" time-unit="MILLISECONDS"/>
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="40000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="group12" max-messages="5">
<int-kafka:topic id="test-topic" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
<file:outbound-channel-adapter id="filesOut"
directory="/tmp/fromKafka">
</file:outbound-channel-adapter>
您看到的是原始 byte[]
。
添加...
<bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
和
<int-kafka:consumer-configuration
value-decoder="decoder"
...
我正在使用 spring 构建 Kafka 消费者。我的配置似乎非常简单。消息正在被消费并保存在文件中。但是,有效载荷是神秘的,我无法获取数据(短 "hello world" 消息)。
这是我访问有效载荷时得到的(例如,当我在入站 kafka 和出站文件之间设置一个转换器 bean 时:
{test-topic={0=[[B@713c9d72, [B@7d656f90, [B@26bb8c83, [B@4b959d83 [B@5ed74e8e]}}
我的问题是:如何访问实际负载("hellow world" 字符串")?
我的配置是:
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<int:poller
max-messages-per-poll="5" default = "true" fixed-delay="10" time-unit="MILLISECONDS"/>
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="40000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="group12" max-messages="5">
<int-kafka:topic id="test-topic" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
<file:outbound-channel-adapter id="filesOut"
directory="/tmp/fromKafka">
</file:outbound-channel-adapter>
您看到的是原始 byte[]
。
添加...
<bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
和
<int-kafka:consumer-configuration
value-decoder="decoder"
...