如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同的主题配置不同的生产者?
How to config different Producer for different topic in spring-integration-kafka 2.1.0.RELEASE and Kafka 0.10.0?
将 Kafka 从 0.9.0 升级到 0.10.0 时,在为不同的主题配置不同的生产者时遇到问题。 XML 配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:publish-subscribe-channel id="inputToKafka" />
<!-- Producer Config -->
<int-kafka:outbound-channel-adapter
id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification"
auto-startup="true" channel="inputToKafka">
<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<int-kafka:outbound-channel-adapter
id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account"
auto-startup="true" channel="inputToKafka">
<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="0" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.serializer.FcmNotificationVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="buffer.memory" value="33554432" />
<entry key="linger.ms" value="0" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int-kafka:message-driven-channel-adapter
id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-trigger-fcm-notification" mode="record"
message-converter="messageConverter" />
<int-kafka:message-driven-channel-adapter
id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-sync-microsoft-account" mode="record"
message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!-- Consumer Config -->
<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>
<int:service-activator input-channel="ip-chanel-sync-microsoft-account"
ref="syncMicrosoftAccountConsumer">
</int:service-activator>
<bean id="fcmContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="true" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="trigger-fcm-notification" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.deserializer.FcmNotificationVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="trigger-fcm-notification" />
</bean>
</constructor-arg>
</bean>
<bean id="microsoftAccountSyncContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="true" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="sync-microsoft-account" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="sync-microsoft-account" />
</bean>
</constructor-arg>
</bean>
</beans>
分别发布2个主题时出错。堆栈轨迹如下
(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.execute(AbstractRequestHandlerAdvice.java:75)
at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy52.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191)
at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366)
at service.TenantDocumentsController.update(TenantDocumentsController.java:277)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
定义了两个独立的序列化器和反序列化器class。但是它如何在内部引用其他class?
我是否遗漏了任何配置?
自从你发送到Kafka后,就没有Deserializer
的主题了。根据您的 StackTrace,您执行了一些 REST 服务,该服务将 GcmNotificationVo
对象发送到 inputToKafka
。
这里第二个订阅者无法使用 common.serializer.MicrosoftAccountSyncRequestVoSerializer
.
执行该对象的 Kafka 序列化
也许您的想法是使用 masOutboundChannelAdapter
进行不同的操作?因此,一个新的单独的 channel
.
将 Kafka 从 0.9.0 升级到 0.10.0 时,在为不同的主题配置不同的生产者时遇到问题。 XML 配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:publish-subscribe-channel id="inputToKafka" />
<!-- Producer Config -->
<int-kafka:outbound-channel-adapter
id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification"
auto-startup="true" channel="inputToKafka">
<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<int-kafka:outbound-channel-adapter
id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account"
auto-startup="true" channel="inputToKafka">
<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="0" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.serializer.FcmNotificationVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="buffer.memory" value="33554432" />
<entry key="linger.ms" value="0" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int-kafka:message-driven-channel-adapter
id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-trigger-fcm-notification" mode="record"
message-converter="messageConverter" />
<int-kafka:message-driven-channel-adapter
id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-sync-microsoft-account" mode="record"
message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!-- Consumer Config -->
<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>
<int:service-activator input-channel="ip-chanel-sync-microsoft-account"
ref="syncMicrosoftAccountConsumer">
</int:service-activator>
<bean id="fcmContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="true" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="trigger-fcm-notification" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.deserializer.FcmNotificationVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="trigger-fcm-notification" />
</bean>
</constructor-arg>
</bean>
<bean id="microsoftAccountSyncContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="true" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="sync-microsoft-account" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="sync-microsoft-account" />
</bean>
</constructor-arg>
</bean>
</beans>
分别发布2个主题时出错。堆栈轨迹如下
(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.execute(AbstractRequestHandlerAdvice.java:75)
at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy52.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191)
at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366)
at service.TenantDocumentsController.update(TenantDocumentsController.java:277)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
定义了两个独立的序列化器和反序列化器class。但是它如何在内部引用其他class? 我是否遗漏了任何配置?
自从你发送到Kafka后,就没有Deserializer
的主题了。根据您的 StackTrace,您执行了一些 REST 服务,该服务将 GcmNotificationVo
对象发送到 inputToKafka
。
这里第二个订阅者无法使用 common.serializer.MicrosoftAccountSyncRequestVoSerializer
.
也许您的想法是使用 masOutboundChannelAdapter
进行不同的操作?因此,一个新的单独的 channel
.