Spring-Kafka Integration 1.0.0.RELEASE Producer 问题

Spring-Kafka Integration 1.0.0.RELEASE Issue with Producer

我无法使用 Spring Kafka 集成发布消息,尽管我的 Kafka Java 客户端工作正常。

Java 代码在 Windows 上 运行,Kafka 在 Linux 框上 运行。

 KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test-cass");
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    Encoder<String> encoder = new StringEncoder<String>();
    producerMetadata.setValueEncoder(encoder);
    producerMetadata.setKeyEncoder(encoder);

    ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "172.16.1.42:9092");

    ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
    kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test-cass", config));
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
    handler.handleMessage(MessageBuilder.withPayload("foo")
            .setHeader("messagekey", "3")
            .setHeader("topic", "test-cass")
            .build());

我收到以下错误

"C:\Program Files\Java\jdk1.7.0_71\bin\java" -Didea.launcher.port=7542 "-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.6\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.7.0_71\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jce.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jfxrt.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\resources.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\rt.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\zipfs.jar;C:\projects\SpringCassandraInt\target\classes;C:\Users\hs\.m2\repository\org\springframework\data\spring-data-cassandra.1.2.RELEASE\spring-data-cassandra-1.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\data\spring-cql.1.2.RELEASE\spring-cql-1.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-context.1.4.RELEASE\spring-context-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-aop.1.4.RELEASE\spring-aop-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\aopalliance\aopalliance.0\aopalliance-1.0.jar;C:\Users\hs\.m2\repository\org\springframework\spring-beans.0.9.RELEASE\spring-beans-4.0.9.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-core.1.2.RELEASE\spring-core-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\commons-logging\commons-logging.1.3\commons-logging-1.1.3.jar;C:\Users\hs\.m2\repository\org\springframework\spring-expression.1.2.RELEASE\spring-expression-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-tx.1.4.RELEASE\spring-tx-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\data\spring-data-commons.9.2.RELEASE\spring-data-commons-1.9.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\slf4j\slf4j-api.7.10\slf4j-api-1.7.10.jar;C:\Users\hs\.m2\repository\org\slf4j\jcl-over-slf4j.7.10\jcl-over-slf4j-1.7.10.jar;C:\Users\hs\.m2\repository\com\datastax\cassandra\cassandra-driver-dse.0.4\cassandra-driver-dse-2.0.4.jar;C:\Users\hs\.m2\repository\com\datastax\cassandra\cassandra-driver-core.0.4\cassandra-driver-core-2.0.4.jar;C:\Users\hs\.m2\repository\io\netty\netty.9.0.Final\netty-3.9.0.Final.jar;C:\Users\hs\.m2\repository\com\codahale\metrics\metrics-core.0.2\metrics-core-3.0.2.jar;C:\Users\hs\.m2\repository\com\google\guava\guava.0\guava-15.0.jar;C:\Users\hs\.m2\repository\org\liquibase\liquibase-core.1.1\liquibase-core-3.1.1.jar;C:\Users\hs\.m2\repository\org\yaml\snakeyaml.13\snakeyaml-1.13.jar;C:\Users\hs\.m2\repository\ch\qos\logback\logback-classic.1.2\logback-classic-1.1.2.jar;C:\Users\hs\.m2\repository\ch\qos\logback\logback-core.1.2\logback-core-1.1.2.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-core.1.2.RELEASE\spring-integration-core-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\projectreactor\reactor-core.1.4.RELEASE\reactor-core-1.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\com\goldmansachs\gs-collections.0.0\gs-collections-5.0.0.jar;C:\Users\hs\.m2\repository\com\goldmansachs\gs-collections-api.0.0\gs-collections-api-5.0.0.jar;C:\Users\hs\.m2\repository\com\lmax\disruptor.2.1\disruptor-3.2.1.jar;C:\Users\hs\.m2\repository\io\gatling\jsr166e.0\jsr166e-1.0.jar;C:\Users\hs\.m2\repository\org\springframework\retry\spring-retry.1.1.RELEASE\spring-retry-1.1.1.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-messaging.1.4.RELEASE\spring-messaging-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-stream.1.2.RELEASE\spring-integration-stream-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-xml.1.2.RELEASE\spring-integration-xml-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-oxm.1.4.RELEASE\spring-oxm-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\ws\spring-xml.2.0.RELEASE\spring-xml-2.2.0.RELEASE.jar;C:\Users\hs\.m2\repository\com\jayway\jsonpath\json-path.2.0\json-path-1.2.0.jar;C:\Users\hs\.m2\repository\net\minidev\json-smart.1.0\json-smart-2.1.0.jar;C:\Users\hs\.m2\repository\net\minidev\asm.0.2\asm-1.0.2.jar;C:\Users\hs\.m2\repository\asm\asm.3.1\asm-3.3.1.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-kafka.0.0.RELEASE\spring-integration-kafka-1.0.0.RELEASE.jar;C:\Users\hs\.m2\repository\org\apache\avro\avro-compiler.7.6\avro-compiler-1.7.6.jar;C:\Users\hs\.m2\repository\org\apache\avro\avro.7.6\avro-1.7.6.jar;C:\Users\hs\.m2\repository\org\codehaus\jackson\jackson-core-asl.9.13\jackson-core-asl-1.9.13.jar;C:\Users\hs\.m2\repository\org\codehaus\jackson\jackson-mapper-asl.9.13\jackson-mapper-asl-1.9.13.jar;C:\Users\hs\.m2\repository\com\thoughtworks\paranamer\paranamer.3\paranamer-2.3.jar;C:\Users\hs\.m2\repository\org\xerial\snappy\snappy-java.0.5\snappy-java-1.0.5.jar;C:\Users\hs\.m2\repository\org\apache\commons\commons-compress.4.1\commons-compress-1.4.1.jar;C:\Users\hs\.m2\repository\org\tukaani\xz.0\xz-1.0.jar;C:\Users\hs\.m2\repository\commons-lang\commons-lang.6\commons-lang-2.6.jar;C:\Users\hs\.m2\repository\org\apache\velocity\velocity.7\velocity-1.7.jar;C:\Users\hs\.m2\repository\commons-collections\commons-collections.2.1\commons-collections-3.2.1.jar;C:\Users\hs\.m2\repository\com\yammer\metrics\metrics-annotation.2.0\metrics-annotation-2.2.0.jar;C:\Users\hs\.m2\repository\com\yammer\metrics\metrics-core.2.0\metrics-core-2.2.0.jar;C:\Users\hs\.m2\repository\org\apache\kafka\kafka_2.10[=13=].8.1.1\kafka_2.10-0.8.1.1.jar;C:\Users\hs\.m2\repository\org\apache\zookeeper\zookeeper.3.4\zookeeper-3.3.4.jar;C:\Users\hs\.m2\repository\log4j\log4j.2.15\log4j-1.2.15.jar;C:\Users\hs\.m2\repository\javax\mail\mail.4\mail-1.4.jar;C:\Users\hs\.m2\repository\javax\activation\activation.1\activation-1.1.jar;C:\Users\hs\.m2\repository\javax\jms\jms.1\jms-1.1.jar;C:\Users\hs\.m2\repository\com\sun\jdmk\jmxtools.2.1\jmxtools-1.2.1.jar;C:\Users\hs\.m2\repository\com\sun\jmx\jmxri.2.1\jmxri-1.2.1.jar;C:\Users\hs\.m2\repository\jline\jline[=13=].9.94\jline-0.9.94.jar;C:\Users\hs\.m2\repository\net\sf\jopt-simple\jopt-simple.2\jopt-simple-3.2.jar;C:\Users\hs\.m2\repository\org\scala-lang\scala-library.10.1\scala-library-2.10.1.jar;C:\Users\hs\.m2\repository\com1tec\zkclient[=13=].3\zkclient-0.3.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.6\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain com.agillic.dialogue.kafka.outbound.SpringKafkaTest
15:39:11.736 [main] INFO  o.s.i.k.support.ProducerFactoryBean - Using producer properties => {metadata.broker.list=172.16.1.42:9092, compression.codec=0}
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Verifying properties
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Property compression.codec is overridden to 0
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Property metadata.broker.list is overridden to 172.16.1.42:9092
15:39:12.164 [main] INFO  o.s.b.f.config.PropertiesFactoryBean - Loading properties file from URL [jar:file:/C:/Users/hs/.m2/repository/org/springframework/integration/spring-integration-core/4.1.2.RELEASE/spring-integration-core-4.1.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
15:39:12.208 [main] DEBUG o.s.i.k.o.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@5204db6b received message: GenericMessage [payload=foo, headers={timestamp=1424356752208, id=00c483d9-ecf8-2937-4a2c-985bd3afcae4, topic=test-cass, messagekey=3}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@5204db6b]; nested exception is java.lang.NullPointerException
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at com.agillic.dialogue.kafka.outbound.SpringKafkaTest.main(SpringKafkaTest.java:40)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.NullPointerException
    at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:58)
    at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:190)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 6 more

Process finished with exit code 1

实际上,当我们引入 KafkaHeaders 时,我们做了适当的文档更改:https://github.com/spring-projects/spring-integration-kafka/blob/master/README.md。请参阅重要说明:

Since the last Milestone, we have introduced the KafkaHeaders interface with constants. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version, you need to specify message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the , or simply change the headers upstream to the new headers from KafkaHeaders using a or MessageBuilder. Or, of course, configure them on the adapter if you are using constant values.

更新

关于NullPointerException:这确实是个问题。欢迎大家踊跃投稿JIRA ticket and we'll take care of that. We are even welcome for the contribution!