如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同的主题配置不同的生产者?

How to config different Producer for different topic in spring-integration-kafka 2.1.0.RELEASE and Kafka 0.10.0?

将 Kafka 从 0.9.0 升级到 0.10.0 时,在为不同的主题配置不同的生产者时遇到问题。 XML 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
 xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

 <int:publish-subscribe-channel id="inputToKafka" />

 <!-- Producer Config -->

 <int-kafka:outbound-channel-adapter
  id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification"
  auto-startup="true" channel="inputToKafka">
  <int-kafka:request-handler-advice-chain>
   <bean
    class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
  </int-kafka:request-handler-advice-chain>
 </int-kafka:outbound-channel-adapter>
 
 <int-kafka:outbound-channel-adapter
  id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account"
  auto-startup="true" channel="inputToKafka">
  <int-kafka:request-handler-advice-chain>
   <bean
    class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
  </int-kafka:request-handler-advice-chain>
 </int-kafka:outbound-channel-adapter>
 
 <bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="retries" value="0" />
      <entry key="batch.size" value="16384" />      
      <entry key="linger.ms" value="0" />
      <entry key="buffer.memory" value="33554432" />
      <entry key="key.serializer"
       value="org.apache.kafka.common.serialization.StringSerializer" />
      <entry key="value.serializer"
       value="common.serializer.FcmNotificationVoSerializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>
 </bean>

 <bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="retries" value="0" />
      <entry key="batch.size" value="16384" />
      <entry key="buffer.memory" value="33554432" /> 
      <entry key="linger.ms" value="0" />
      <entry key="key.serializer"
       value="org.apache.kafka.common.serialization.StringSerializer" />
      <entry key="value.serializer"
       value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>
 </bean>

 <int-kafka:message-driven-channel-adapter
  id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer"
  auto-startup="true" phase="100" send-timeout="5000"
  channel="ip-chanel-trigger-fcm-notification" mode="record"
  message-converter="messageConverter" />

 <int-kafka:message-driven-channel-adapter
  id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer"
  auto-startup="true" phase="100" send-timeout="5000"
  channel="ip-chanel-sync-microsoft-account" mode="record"
  message-converter="messageConverter" />
  
 <bean id="messageConverter"
  class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

 <!-- Consumer Config -->
 <int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
  ref="fcmNotificationConsumer">
 </int:service-activator>
 
 <int:service-activator input-channel="ip-chanel-sync-microsoft-account"
  ref="syncMicrosoftAccountConsumer">
 </int:service-activator>
 
 <bean id="fcmContainer"
  class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="enable.auto.commit" value="true" />
      <entry key="auto.commit.interval.ms" value="100" />
      <entry key="session.timeout.ms" value="15000" />
      <entry key="group.id" value="trigger-fcm-notification" />
      <entry key="key.deserializer"
       value="org.apache.kafka.common.serialization.StringDeserializer" />
      <entry key="value.deserializer"
       value="common.deserializer.FcmNotificationVoDeserializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>
  <constructor-arg>
   <bean class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="trigger-fcm-notification" />
   </bean>
  </constructor-arg>
 </bean>
  
 <bean id="microsoftAccountSyncContainer"
  class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
  <constructor-arg>
   <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value="localhost:9092" />
      <entry key="enable.auto.commit" value="true" />
      <entry key="auto.commit.interval.ms" value="100" />
      <entry key="session.timeout.ms" value="15000" />
      <entry key="group.id" value="sync-microsoft-account" />
      <entry key="key.deserializer"
       value="org.apache.kafka.common.serialization.StringDeserializer" />
      <entry key="value.deserializer"
       value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" />
     </map>
    </constructor-arg>
   </bean>
  </constructor-arg>
  <constructor-arg>
   <bean class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="sync-microsoft-account" />
   </bean>
  </constructor-arg>
 </bean>

</beans>

分别发布2个主题时出错。堆栈轨迹如下

(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
 at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
 at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.execute(AbstractRequestHandlerAdvice.java:75)
 at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62)
 at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
 at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
 at com.sun.proxy.$Proxy52.handleMessage(Unknown Source)
 at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)
 at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185)
 at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
 at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191)
 at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366)
 at service.TenantDocumentsController.update(TenantDocumentsController.java:277)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
 at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
 at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
 at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)

定义了两个独立的序列化器和反序列化器class。但是它如何在内部引用其他class? 我是否遗漏了任何配置?

自从你发送到Kafka后,就没有Deserializer的主题了。根据您的 StackTrace,您执行了一些 REST 服务,该服务将 GcmNotificationVo 对象发送到 inputToKafka

这里第二个订阅者无法使用 common.serializer.MicrosoftAccountSyncRequestVoSerializer.

执行该对象的 Kafka 序列化

也许您的想法是使用 masOutboundChannelAdapter 进行不同的操作?因此,一个新的单独的 channel.