Spring 多个主题的 Kafka 单一生产者
Spring Kafka single producer for multiple topics
我想使用单个生产者将 JSON 个对象写入多个主题。
下面的代码正在做我想要的,但是使用 setDefaultTopic()
方法告诉 KafkaTemplate
它应该将消息发送到哪个主题感觉不对。
如果我使用 send(String topic, ? payload)
方法,那么 StringJsonMessageConverter
将不起作用。
我的制作人:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
我的配置:
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
关于如何正确执行此操作的任何建议?
更新
我把代码改成了这个...
制作人:
public void send(Message<?> message) {
kafka.send(message);
}
在我的控制器中(我创建消息对象的地方);
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
MessageHeaders 对象仍将包含 ID 和时间戳。
您必须在调用模板之前手动使用 StringJsonMessageConverter
:
ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic);
kafka.send(producerRecord);
使用 send(Message<?>)
变体时,消息转换器希望主题在消息中 header...
String topic = headers.get(KafkaHeaders.TOPIC, String.class);
如果您可以通过其他方式确定主题,则需要创建自定义转换器。
更改默认主题不是thread-safe。
我想使用单个生产者将 JSON 个对象写入多个主题。
下面的代码正在做我想要的,但是使用 setDefaultTopic()
方法告诉 KafkaTemplate
它应该将消息发送到哪个主题感觉不对。
如果我使用 send(String topic, ? payload)
方法,那么 StringJsonMessageConverter
将不起作用。
我的制作人:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
我的配置:
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
关于如何正确执行此操作的任何建议?
更新
我把代码改成了这个...
制作人:
public void send(Message<?> message) {
kafka.send(message);
}
在我的控制器中(我创建消息对象的地方);
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
MessageHeaders 对象仍将包含 ID 和时间戳。
您必须在调用模板之前手动使用 StringJsonMessageConverter
:
ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic);
kafka.send(producerRecord);
使用 send(Message<?>)
变体时,消息转换器希望主题在消息中 header...
String topic = headers.get(KafkaHeaders.TOPIC, String.class);
如果您可以通过其他方式确定主题,则需要创建自定义转换器。
更改默认主题不是thread-safe。