如何使用 xml 配置升级到 spring-integration-kafka 2.1.0.RELEASE?

How to upgrade to spring-integration-kafka 2.1.0.RELEASE with xml configuration?

我正在将 spring-integration-kafka 从 1.0.0.M2 升级到 2.1。0.RELEASE 并将客户端 0.9.0 升级到 0.10.0

当前xml配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
 xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

 <int:publish-subscribe-channel id="inputToKafka" />

 <int-kafka:outbound-channel-adapter
  kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
  channel="inputToKafka" order="1">
 </int-kafka:outbound-channel-adapter>

 <bean id="producerProperties"
  class="org.springframework.beans.factory.config.PropertiesFactoryBean">
  <property name="properties">
   <props>
    <prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
    <prop key="message.send.max.retries">${message.send.max.retries}</prop>
    <prop key="send.buffer.bytes">${send.buffer.bytes}</prop>
   </props>
  </property>
 </bean>
 
 <bean id="fcmNotificationEncoder"
  class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
  <constructor-arg value="common.vo.NotificationVo" />
 </bean>

 <int-kafka:producer-context id="kafkaProducerContext"
  producer-properties="producerProperties">
  <int-kafka:producer-configurations>
   <int-kafka:producer-configuration
    broker-list="${kafka.servers}" key-class-type="java.lang.String"
    value-class-type="common.vo.fcmNotificationVo"
    value-encoder="fcmNotificationEncoder" topic="trigger-fcm-notification"
    compression-codec="none" />
  </int-kafka:producer-configurations>
 </int-kafka:producer-context>

 <int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
  ref="fcmNotificationConsumer">
 </int:service-activator>

 <bean id="consumerProperties"
  class="org.springframework.beans.factory.config.PropertiesFactoryBean">
  <property name="properties">
   <props>
    <prop key="auto.offset.reset">${auto.offset.reset}</prop>
    <prop key="socket.receive.buffer.bytes">${socket.receive.buffer.bytes}</prop> <!-- 10M -->
    <prop key="fetch.message.max.bytes">${fetch.message.max.bytes}</prop>
    <prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
   </props>
  </property>
 </bean>


 <int-kafka:zookeeper-connect id="zookeeperConnect"
  zk-connect="${zookeeper.servers}" zk-connection-timeout="${zookeeper.connection.timeout}"
  zk-session-timeout="${zookeeper.session.timeout}" zk-sync-time="${zookeeper.sync.time}" />


 <bean id="kafkaThreadListener" class="api.utils.KafkaConsumerStarter"
  init-method="initIt" destroy-method="cleanUp" />

 <int-kafka:inbound-channel-adapter
  kafka-consumer-context-ref="consumerContextFCM" auto-startup="false"
  channel="ip-chanel-trigger-fcm-notification" id="kafka-inbound-channel-adapter-FCM">
  <int:poller fixed-delay="1000" time-unit="MILLISECONDS"
   receive-timeout="0" />
 </int-kafka:inbound-channel-adapter>


 <!-- Consumer -->

 <bean id="fcmNotificationDecoder"
  class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
  <constructor-arg value="common.vo.NotificationVo" />
 </bean>

 <int-kafka:consumer-context id="consumerContextFCM"
  consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
  consumer-properties="consumerProperties">
  <int-kafka:consumer-configurations>
   <int-kafka:consumer-configuration
    group-id="trigger-fcm-notification" max-messages="50"
    value-decoder="fcmNotificationDecoder">
    <int-kafka:topic id="trigger-fcm-notification"
     streams="10" />
   </int-kafka:consumer-configuration>
  </int-kafka:consumer-configurations>
 </int-kafka:consumer-context>

</beans>

如何将其更改为 2.1。0.RELEASE ?

~~~~~~~~~~~~~~~

在此处编辑:

根据我的要求使用参考修改了xml。我在阅读消费者记录时遇到了一个小问题。我得到的 payload 如下

{
kafka_offset=7, 
kafka_receivedMessageKey=null, 
kafka_receivedPartitionId=0, 
kafka_receivedTopic=trigger-fcm-notification, 
kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 7, CreateTime = 1476864644264, checksum = 3680317883, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@203c8c95)
}

我需要值 (NotificationVo) 以便在消费者中进一步使用。如何将其作为有效负载的一部分?

~~~~~~~~~~~~~~~

在此处编辑:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
 xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

 <int:publish-subscribe-channel id="inputToKafka" />

 <int-kafka:outbound-channel-adapter
  id="kafkaOutboundChannelAdapter" kafka-template="template"
  auto-startup="true" channel="inputToKafka" topic="trigger-fcm-notification"
  order="1">

  <int-kafka:request-handler-advice-chain>
   <bean
    class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
  </int-kafka:request-handler-advice-chain>
 </int-kafka:outbound-channel-adapter>

 <!--Producer-->
 <bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="retries" value="5" />
      <entry key="batch.size" value="16384" />
      <entry key="linger.ms" value="1" />
      <entry key="buffer.memory" value="33554432" />
      <entry key="key.serializer"
       value="org.apache.kafka.common.serialization.StringSerializer" />
      <entry key="value.serializer"
       value="common.vo.NotificationVoSerializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>
 </bean>


 <int-kafka:message-driven-channel-adapter
  id="kafka-inbound-channel-adapter-FCM" listener-container="container1" 
  auto-startup="true" phase="100" send-timeout="5000" 
  channel="ip-chanel-trigger-fcm-notification" mode="record"
  message-converter="messageConverter" />

 <bean id="messageConverter"
  class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

 <int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
  ref="fcmNotificationConsumer">
 </int:service-activator>

 <!--Consumer-->
 <bean id="container1"
  class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="enable.auto.commit" value="false" />
      <entry key="auto.commit.interval.ms" value="100" />
      <entry key="session.timeout.ms" value="15000" />
      <entry key="group.id" value="trigger-fcm-notification" />
      <entry key="key.deserializer"
       value="org.apache.kafka.common.serialization.StringDeserializer" />
      <entry key="value.deserializer"
       value="common.vo.NotificationVoDeserializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>

  <constructor-arg>
   <bean class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="trigger-fcm-notification" />
   </bean>
  </constructor-arg>
 </bean>

</beans>

这是修改后的 xml 配置文件

~~~~~~~~~~~~~~~

在此处编辑:

消费者class:

package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

 @SuppressWarnings("unchecked")
 @ServiceActivator
 public <K, V> void process(Map<K, V> payload) {

  String topic = null;
  System.out.println("payload=====>"+payload.toString());

  for (K item : payload.keySet()) {
   topic = (String) item;
  }

  Object ackObject = payload.get(topic);
  System.out.println("ackObject=====>"+payload.get(topic));

 }
}

O/P:

payload=====>{kafka_offset=21, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)}

ackObject=====>Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)

~~~~~~~~~~~~~~~

在此处编辑:

更改 Consumer 中的方法参数后收到预期的负载 class。

package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

 @SuppressWarnings("unchecked")
 @ServiceActivator
 public void process(Message<?> message) {
        System.out.println("Message=====>"+message);
       Object payloadObject = message.getPayload();
   NotificationVo notificationVo = (NotificationVo) payloadObject;
 }
}

O/P:

Message=====>GenericMessage [payload=common.vo.NotificationVo@4c144e99, headers={kafka_offset=16, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 16, CreateTime = 1476949945607, checksum = 2501578118, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@4c144e99)}]

终于按预期工作了。

非常感谢支持

好的。现在比以前干净多了。

因此,您当前的代码适用于 Apache Kafka 0.8。自 0.9 版以来,它具有完全不同的设计。因此,必须丢弃 Spring Integration Kafka 1.0 的当前代码。

您应该了解 Apache Kafka 0.10:https://kafka.apache.org/documentation

关于Spring Kafka,基于Kafka Client 0.10:http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/

并注意有关 Spring 集成 Kafka 的章节:http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/_spring_integration.html

请注意:这里没有人会为您工作。

编辑

我不确定你为什么说你有这样的 payload,因为那正是 headersConsumerRecord中的value被转换为消息的payload

请分享您收到的代码并尝试提取 value<int-kafka:message-driver-channel-adapter> 从转换后的 ConsumerRecord value.

生成 Message<>headerspayload