Spring 整合kafka是否支持动态创建topic
Does Spring integeration kafka support dynamic topic creation
我是 spring 整数化 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我以编程方式创建主题而无需在上下文中设置 xml?
即:根据我给转换器的消息,我想 post 向为此消息类型创建的 kafka 主题发送消息。
更新:
下面是我最后做的。将欢迎任何更好的解决方案。
<int:channel id="inputForSolrPublish"></int:channel>
<int:service-activator input-channel="inputForSolrPublish"
ref="solrMasterListRouter" >
-->
private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
final String brokerList,
final Message<?> message) throws Exception {
for (String topicName : topicNames) {
createProducerContext(topicName,
brokerList).send(topicName,
message.getHeaders()
.get(KafkaHeaders.MESSAGE_KEY),
message);
}
}
private KafkaProducerContext<String, String> createProducerContext(final String topicName,
final String brokerList) throws Exception {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
// Encoder<String> encoder = new
// org.springframework.integration.kafka.serializer.common.StringEncoder<String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
producerMetadata.setValueEncoder(kafkaSpecificEncoder);
producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
producerMetadata.setAsync(true);
Properties props = buildProducerConfigProperties();
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
brokerList,
props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
config));
return kafkaProducerContext;
}
private Properties buildProducerConfigProperties() {
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms",
"3600000");
props.put("message.send.max.retries",
"5");
props.put("tsend.buffer.bytes",
"5242880");
return props;
}
是的,您可以在运行时执行此操作。参见 TopicUtils.ensureTopicCreated
。
您可以像 <service-activator>
一样将其添加为另一个订阅者(第一个订阅者)到 <publish-subscribe-channel>
以发送消息。像这样:
<publish-subscribe-channel id="sendMessageToKafkaChannel"/>
<service-activator input-channel="sendMessageToKafkaChannel" output-channel="nullChannel" order="1"
ref="creatTopicService" method="creatTopic"/>
<int-kafka:outbound-channel-adapter channel="sendMessageToKafkaChannel" order="2"/>
接受 creatTopic
整个消息并从消息中或在注入阶段提取所有必需的参数,例如为第一个 zkAddress
ensureTopicCreated
参数注入 ZookeeperConnect
以提取 getZkConnect()
。
但是你应该明白,如果没有 Kafka 上的现有主题,你就不能拥有 <int-kafka:message-driven-channel-adapter>
。因此,我不确定您将来将如何处理那些动态创建的主题中的消息。尽管 <int-kafka:inbound-channel-adapter>
可能适用于这种情况...
我是 spring 整数化 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我以编程方式创建主题而无需在上下文中设置 xml?
即:根据我给转换器的消息,我想 post 向为此消息类型创建的 kafka 主题发送消息。
更新:
下面是我最后做的。将欢迎任何更好的解决方案。
<int:channel id="inputForSolrPublish"></int:channel>
<int:service-activator input-channel="inputForSolrPublish"
ref="solrMasterListRouter" >
-->
private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
final String brokerList,
final Message<?> message) throws Exception {
for (String topicName : topicNames) {
createProducerContext(topicName,
brokerList).send(topicName,
message.getHeaders()
.get(KafkaHeaders.MESSAGE_KEY),
message);
}
}
private KafkaProducerContext<String, String> createProducerContext(final String topicName,
final String brokerList) throws Exception {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
// Encoder<String> encoder = new
// org.springframework.integration.kafka.serializer.common.StringEncoder<String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
producerMetadata.setValueEncoder(kafkaSpecificEncoder);
producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
producerMetadata.setAsync(true);
Properties props = buildProducerConfigProperties();
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
brokerList,
props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
config));
return kafkaProducerContext;
}
private Properties buildProducerConfigProperties() {
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms",
"3600000");
props.put("message.send.max.retries",
"5");
props.put("tsend.buffer.bytes",
"5242880");
return props;
}
是的,您可以在运行时执行此操作。参见 TopicUtils.ensureTopicCreated
。
您可以像 <service-activator>
一样将其添加为另一个订阅者(第一个订阅者)到 <publish-subscribe-channel>
以发送消息。像这样:
<publish-subscribe-channel id="sendMessageToKafkaChannel"/>
<service-activator input-channel="sendMessageToKafkaChannel" output-channel="nullChannel" order="1"
ref="creatTopicService" method="creatTopic"/>
<int-kafka:outbound-channel-adapter channel="sendMessageToKafkaChannel" order="2"/>
接受 creatTopic
整个消息并从消息中或在注入阶段提取所有必需的参数,例如为第一个 zkAddress
ensureTopicCreated
参数注入 ZookeeperConnect
以提取 getZkConnect()
。
但是你应该明白,如果没有 Kafka 上的现有主题,你就不能拥有 <int-kafka:message-driven-channel-adapter>
。因此,我不确定您将来将如何处理那些动态创建的主题中的消息。尽管 <int-kafka:inbound-channel-adapter>
可能适用于这种情况...