使用 Camel-Kafka 的 ProducerTemplate sendBody() 方法时,无法为端点创建 Producer
Failed to create Producer for endpoint when using Camel-Kafka's ProducerTemplate sendBody() method
我正在测试使用 Apache Camel 在 Kafka (0.8.2.1) 上发送消息的简单生产者。我在骆驼中使用 java DSL 创建了端点。
CamelContext ctx =new DefaultCamelContext();
PropertiesComponent properties=new PropertiesComponent();
properties.setLocation("com/camel/test/props.properties");
ctx.addComponent("properties",properties);
final String uri= "kafka://{{kafka.host}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
String uriParams = "&metadata.broker.list={{metadata.broker.list}";
ctx.addRoutes(new RouteBuilder() {
public void configure() { //
from(uri+"&groupId={{groupId}}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody());
}
})
;
}
});
ctx.start();
ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.sendBody(ctx.getEndpoint(uri), "my test is working");// Error occurs here
现在我想使用 Apache Camel 提供的 ProducerTempalte 在 kafka 上发送消息。但是运行程序时出现以下错误
注意:Zookeeper 和 Kafka 已启动,可以使用 kafka 控制台 produce/consume 消息。
Exception in thread "main" org.apache.camel.FailedToCreateProducerException: Failed to create Producer for endpoint: Endpoint[kafka://localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181]. Reason: java.lang.NullPointerException
at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:407)
at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:220)
at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
at com.camel.test.CamelTest.main(CamelTest.java:45)
Caused by: java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:514)
at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:54)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:61)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:2869)
at org.apache.camel.impl.DefaultCamelContext.doAddService(DefaultCamelContext.java:1097)
at org.apache.camel.impl.DefaultCamelContext.addService(DefaultCamelContext.java:1058)
at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:405)
... 6 more
我猜这些属性没有为生产者设置,但不知道如何在生产者模板中设置。
uri 应该将代理列表作为服务器名称(不要因为我没有创建此组件的语法而责怪我)。
final String uri= "kafka://{{metadata.broker.list}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
我能够通过调试找到解决方案。默认情况下,ProducerTemplate 需要在创建新 object 时未设置的默认参数(这可能是 API 中的错误)。
所以我找到了一种通过 URI 发送参数的方法。以下参数是强制性的
- metadata.broker.list(作为 URI 参数)
- request.required.acks(已默认设置)
- producer.type(在这种情况下不需要,但其他 APIs 需要)
- serializer.class(已默认设置)
- 分区程序 (class)(作为 URI 参数)
- PARTITION_KEY(如Header)
我们没有为 Partition_key 发送参数的选项,因此需要在 Header 中添加它。所以使用sendBodyAndHeader方法发送producer消息
ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.setDefaultEndpoint(ctx.getEndpoint(uri+"&partitioner={{partitioner.class}}"));
ctx.start();
tmp.sendBodyAndHeader("my test is working "+(new Random()).nextInt(100), KafkaConstants.PARTITION_KEY, 1);
tmp.stop();
ctx.stop();
我正在测试使用 Apache Camel 在 Kafka (0.8.2.1) 上发送消息的简单生产者。我在骆驼中使用 java DSL 创建了端点。
CamelContext ctx =new DefaultCamelContext();
PropertiesComponent properties=new PropertiesComponent();
properties.setLocation("com/camel/test/props.properties");
ctx.addComponent("properties",properties);
final String uri= "kafka://{{kafka.host}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
String uriParams = "&metadata.broker.list={{metadata.broker.list}";
ctx.addRoutes(new RouteBuilder() {
public void configure() { //
from(uri+"&groupId={{groupId}}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody());
}
})
;
}
});
ctx.start();
ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.sendBody(ctx.getEndpoint(uri), "my test is working");// Error occurs here
现在我想使用 Apache Camel 提供的 ProducerTempalte 在 kafka 上发送消息。但是运行程序时出现以下错误 注意:Zookeeper 和 Kafka 已启动,可以使用 kafka 控制台 produce/consume 消息。
Exception in thread "main" org.apache.camel.FailedToCreateProducerException: Failed to create Producer for endpoint: Endpoint[kafka://localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181]. Reason: java.lang.NullPointerException
at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:407)
at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:220)
at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
at com.camel.test.CamelTest.main(CamelTest.java:45)
Caused by: java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:514)
at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:54)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:61)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:2869)
at org.apache.camel.impl.DefaultCamelContext.doAddService(DefaultCamelContext.java:1097)
at org.apache.camel.impl.DefaultCamelContext.addService(DefaultCamelContext.java:1058)
at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:405)
... 6 more
我猜这些属性没有为生产者设置,但不知道如何在生产者模板中设置。
uri 应该将代理列表作为服务器名称(不要因为我没有创建此组件的语法而责怪我)。
final String uri= "kafka://{{metadata.broker.list}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
我能够通过调试找到解决方案。默认情况下,ProducerTemplate 需要在创建新 object 时未设置的默认参数(这可能是 API 中的错误)。 所以我找到了一种通过 URI 发送参数的方法。以下参数是强制性的
- metadata.broker.list(作为 URI 参数)
- request.required.acks(已默认设置)
- producer.type(在这种情况下不需要,但其他 APIs 需要)
- serializer.class(已默认设置)
- 分区程序 (class)(作为 URI 参数)
- PARTITION_KEY(如Header)
我们没有为 Partition_key 发送参数的选项,因此需要在 Header 中添加它。所以使用sendBodyAndHeader方法发送producer消息
ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.setDefaultEndpoint(ctx.getEndpoint(uri+"&partitioner={{partitioner.class}}"));
ctx.start();
tmp.sendBodyAndHeader("my test is working "+(new Random()).nextInt(100), KafkaConstants.PARTITION_KEY, 1);
tmp.stop();
ctx.stop();