在 Web 应用程序中集成 Kafka 生产者和消费者

Integrating Kafka producer and consumer inside web application

在 tomcat Web 应用程序中集成 Kafka 生产者和消费者的最佳策略是什么?'

我正在使用 spring-integration-kafka 最新版本。

谢谢

在 Spring MVC 中使用相同的代码时,我无法生成消息。但是,它在独立 Java 程序中使用时有效。

2015-03-06 15:46:46 DEBUG QueueChannel:383 - postSend (sent=true) on channel 'inputToKafka', message: GenericMessage [payload=EventLogEvent{key=com.springapp.mvc.util.EventLogEventKey@cb5d2218, numberOfEvents=10, resetCounterInDays=5, template=6, proposition=5}, headers={timestamp=1425653206240, id=eccb00b0-6617-b5c4-6de0-45a08730041e, messageKey=1, kafka_topic=test-hs}]
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:1610 - Invoking afterPropertiesSet() on bean with name 'hello'
2015-03-06 15:46:46 DEBUG TestDispatcherServlet:1225 - Rendering view [org.springframework.web.servlet.view.InternalResourceView: name 'hello'; URL [/WEB-INF/pages/hello.jsp]] in DispatcherServlet with name ''
2015-03-06 15:46:46 DEBUG InternalResourceView:432 - Added model object 'message' of type [java.lang.String] to request in view with name 'hello'
2015-03-06 15:46:46 DEBUG InternalResourceView:166 - Forwarding to resource [/WEB-INF/pages/hello.jsp] in InternalResourceView 'hello'
2015-03-06 15:46:46 DEBUG MockRequestDispatcher:67 - MockRequestDispatcher: forwarding to [/WEB-INF/pages/hello.jsp]
2015-03-06 15:46:46 DEBUG TestDispatcherServlet:996 - Successfully completed request
2015-03-06 15:46:46 DEBUG DirtiesContextTestExecutionListener:86 - After test method: context [DefaultTestContext@68af6fa0 testClass = AppTests, testInstance = com.springapp.mvc.AppTests@55c9a711, testMethod = simple@AppTests, testException = [null], mergedContextConfiguration = [WebMergedContextConfiguration@70e08379 testClass = AppTests, locations = '{file:src/main/webapp/WEB-INF/mvc-dispatcher-servlet.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.test.context.web.WebDelegatingSmartContextLoader', parent = [null]]], class dirties context [false], class mode [null], method dirties context [false].
2015-03-06 15:46:46 DEBUG ServletTestExecutionListener:147 - Resetting RequestContextHolder for test context [DefaultTestContext@68af6fa0 testClass = AppTests, testInstance = com.springapp.mvc.AppTests@55c9a711, testMethod = simple@AppTests, testException = [null], mergedContextConfiguration = [WebMergedContextConfiguration@70e08379 testClass = AppTests, locations = '{file:src/main/webapp/WEB-INF/mvc-dispatcher-servlet.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.test.context.web.WebDelegatingSmartContextLoader', parent = [null]]].
2015-03-06 15:46:46 DEBUG DirtiesContextTestExecutionListener:118 - After test class: context [DefaultTestContext@68af6fa0 testClass = AppTests, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [WebMergedContextConfiguration@70e08379 testClass = AppTests, locations = '{file:src/main/webapp/WEB-INF/mvc-dispatcher-servlet.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.test.context.web.WebDelegatingSmartContextLoader', parent = [null]]], dirtiesContext [false].
2015-03-06 15:46:46 INFO  GenericWebApplicationContext:862 - Closing org.springframework.web.context.support.GenericWebApplicationContext@5f0dd6db: startup date [Fri Mar 06 15:46:45 CET 2015]; root of context hierarchy 
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'globalChannelInterceptorProcessor'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'kafkaOutboundChannelAdapter'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'kafkaProducerContext'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:247 - Returning cached instance of singleton bean 'lifecycleProcessor'
2015-03-06 15:46:46 INFO  DefaultLifecycleProcessor:356 - Stopping beans in phase 1073741823
2015-03-06 15:46:46 INFO  DefaultLifecycleProcessor:356 - Stopping beans in phase 0
2015-03-06 15:46:46 DEBUG DefaultLifecycleProcessor:226 - Asking bean 'kafkaProducerContext' of type [class org.springframework.integration.kafka.support.KafkaProducerContext] to stop
2015-03-06 15:46:46 INFO  Producer:68 - Shutting down producer
2015-03-06 15:46:46 INFO  ProducerPool:68 - Closing all sync producers
2015-03-06 15:46:46 DEBUG DefaultLifecycleProcessor:235 - Bean 'kafkaProducerContext' completed its stop procedure
2015-03-06 15:46:46 DEBUG DefaultLifecycleProcessor:226 - Asking bean '_org.springframework.integration.errorLogger' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] to stop
2015-03-06 15:46:46 INFO  EventDrivenConsumer:83 - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-03-06 15:46:46 INFO  PublishSubscribeChannel:69 - Channel 'org.springframework.web.context.support.GenericWebApplicationContext@5f0dd6db.errorChannel' has 0 subscriber(s).
2015-03-06 15:46:46 DEBUG DefaultLifecycleProcessor:235 - Bean '_org.springframework.integration.errorLogger' completed its stop procedure
2015-03-06 15:46:46 INFO  EventDrivenConsumer:131 - stopped _org.springframework.integration.errorLogger
2015-03-06 15:46:46 INFO  DefaultLifecycleProcessor:356 - Stopping beans in phase -2147483648
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:499 - Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@53164a3b: defining beans [channelInitializer,$autoCreateChannelCandidates,IntegrationConfigurationBeanFactoryPostProcessor,integrationEvaluationContext,org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor#0,integrationGlobalProperties,integrationHeaderChannelRegistry,globalChannelInterceptorProcessor,jsonPath,xpath,toStringFriendlyJsonNodeToStringConverter,converterRegistrar,integrationConversionService,DefaultConfiguringBeanFactoryPostProcessor,datatypeChannelMessageConverter,messageBuilderFactory,inputToKafka,org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0,org.springframework.scheduling.support.PeriodicTrigger#0,kafkaOutboundChannelAdapter,taskExecutor,kafkaSpecificEncoder,producerProperties,kafkaProducerContext,helloController,kafkaProducer,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.web.servlet.view.InternalResourceViewResolver#0,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor,org.springframework.context.annotation.ConfigurationClassPostProcessor.enhancedConfigurationProcessor,nullChannel,errorChannel,_org.springframework.integration.errorLogger,taskScheduler,org.springframework.integration.config.IdGeneratorConfigurer#0]; root of factory hierarchy
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#220d52b8': [_org.springframework.integration.errorLogger]
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#40452b0f': [kafkaOutboundChannelAdapter]
2015-03-06 15:46:46 DEBUG DisposableBeanAdapter:245 - Invoking destroy() on bean with name 'taskExecutor'
2015-03-06 15:46:46 INFO  ThreadPoolTaskExecutor:203 - Shutting down ExecutorService
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#2051acec': [kafkaProducerContext]
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#1d79997b': [(inner bean)#2051acec]
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#1d79997b#1': [(inner bean)#7779ad44]
2015-03-06 15:46:46 DEBUG DisposableBeanAdapter:245 - Invoking destroy() on bean with name 'taskScheduler'
2015-03-06 15:46:46 INFO  ThreadPoolTaskScheduler:203 - Shutting down ExecutorService 'taskScheduler'
2015-03-06 15:46:46 DEBUG DefaultListableBeanFactory:555 - Retrieved dependent beans for bean '(inner bean)#77ce5421': [taskScheduler]

下面粘贴相同的代码。定义了两种资源,一种是使用 Kafka native Java API 并且它正在运行。其他资源正在使用 spring-集成,但无法正常工作。

    <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
   xmlns:task="http://www.springframework.org/schema/task"
   xmlns:mvc="http://www.springframework.org/schema/mvc"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
     http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
    ">


<context:component-scan base-package="com.springapp.mvc"/>
 <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
    <property name="prefix" value="/WEB-INF/pages/"/>
    <property name="suffix" value=".jsp"/>
</bean>
<int:channel id="inputToKafka">
    <int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    order="1"


        >
    <int:poller fixed-delay="10" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="10" keep-alive="120" queue-capacity="500"/>

<bean id="producerProperties"
      class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
            <prop key="message.send.max.retries">5</prop>
            <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
            <prop key="request.required.acks">1</prop>
        </props>
    </property>
</bean>
<int-kafka:producer-context id="kafkaProducerContext"
                            producer-properties="producerProperties">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="172.16.1.13:9092"
                                          topic="test-hs"
                                         value-class-type="java.lang.Object"
                                          compression-codec="default"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

控制器class

package com.springapp.mvc;
import com.springapp.mvc.util.EventLogEvent;
import com.springapp.mvc.util.EventLogEventKey;
import com.springapp.mvc.util.KafkaEventLogWriterChannel;
import com.springapp.mvc.util.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.kafka.support.KafkaProducerContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Controller;
import org.springframework.ui.ModelMap;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Controller
@RequestMapping("/")
public class KafkaController {

@Autowired
private KafkaProducer kafkaProducer;



@RequestMapping(method = RequestMethod.GET)
public String printWelcome(ModelMap model) {
    model.addAttribute("message", "Hello world!");

    kafkaProducer.sendMessageToKafka();
    return "hello";
}

@RequestMapping(value = "/native", method = RequestMethod.PUT)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void nativeAPI(@RequestBody String bookCase) throws Exception {


    KafkaEventLogWriterChannel ch = new KafkaEventLogWriterChannel("test-hs");
    Map<String, String> data = new HashMap<String, String>();
    data.put("kafka.api", "native");
    ch.writeData(data);
}

@RequestMapping(value = "/spring", method = RequestMethod.PUT)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void springIntegration(@RequestBody String bookCase) throws Exception     {

    kafkaProducer.sendMessageToKafka();

}
}

正在使用 spring-integration 1.0 API.

发送消息的 Class
package com.springapp.mvc.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by hs on 06-03-2015.
*/
@Component
public class KafkaProducer {

@Autowired
@Qualifier("inputToKafka")
MessageChannel inputToKafka;


public void sendMessageToKafka()
{

    Map<String, String> data = new HashMap<String, String>();
    data.put("kafka.api", "spring-integration");

   boolean status =  inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(data).setHeader("messageKey", String.valueOf(1)).setHeader(KafkaHeaders.TOPIC, "test-hs").build());
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }


}

}

请说得更具体一些。

Spring 集成 Kafka 支持只是 Spring 集成的扩展,后者又是 Spring 框架的扩展。

因为您可以简单地实现 Spring MVC Web 应用程序,所以没有任何停止为它提供任何其他集成的东西,比如 Kafka。

由于 Spring Integration 实现了 EIP 并为众所周知的协议提供了足够的适配器,并使用 MessageChannelMessage 抽象连接所有内容,您可以使用 [=12= 简单地接受 HTTP 请求] 并将其发送到 <int-kafka:outbound-channel-adapter>.

反之亦然:从 Kafka 到 HTTP 客户端,但您可以通过任何其他适配器和协议来做到这一点,例如 JDBC(用于存储 Kafka 消息)或 WebSockets 将 Kafka 消息推送到连接的客户端。

目前,我没有发现任何问题可以在基于 Spring 产品组合的 Web 应用程序中提供任何所需的集成解决方案。

我有同样的问题,通过将配置 auto-startupfalse 更改为 true

auto-startup="true"

其中

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                kafka-producer-context-ref="kafkaProducerContext"
                                auto-startup="false"
                                channel="inputToKafka"
                                order="1"


    >

有效!