古巴平台 - Spring Kafka 集成
Cuba Platform - Spring Kafka Integration
我必须将 Kafka 集成到 Cuba,我认为这就像添加 spring kafka 依赖项并创建一个 Configuration
注释 class 来初始化自 Cuba 以来的 Kafka 消费者一样简单基于 Spring.
当我添加一个配置时,我发现它在古巴启动时没有被扫描。当我切换到 CUBA 视图时,我注意到只有那些被注释为 Service
或 Component
的 classes 会被读取。然而,即使我添加 Component
class,它仍然没有被正确扫描(我添加了一个用 @Value
注释的字段,它寻找一个不存在的 属性 但古巴启动时没有抛出任何错误)
CUBA+Kafka集成有一个简单的例子,你可以在这里找到:https://github.com/cuba-labs/kafka-sample
配置过程取自official Spring documentation。
- 关键配置class是com.company.kafkasample.config.KafkaConfig。它包含许多可帮助您配置 Kafka 设施的 bean。在此特定示例中,同时配置了生产者和消费者。请注意,配置参数是硬编码的,但这只是一个示例。
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
- 之后,您应该能够创建一个服务,通过注入 KafkaTemplate bean 将消息发送到 Kafka 队列。
@Inject
private KafkaTemplate<Integer, String> template;
@Override
public void sendMessage(String message) {
log.info("Sending {} using Kafka", message);
long id = uniqueNumbersService.getNextNumber("users");
ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Failed to send message {}, error {}", message, ex.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("Message {} sent", message);
}
});
}
- 然后您可以将此服务注入屏幕并在那里使用它。
- 至于接收者,你可以在CUBA组件中使用
@KafkaListener
注解来注解它的方法。例如下面的例子将kafka消息保存到数据库中。
@Component
@DependsOn("consumerFactory")
public class MessageListener {
@Inject
private DataManager dataManager;
@KafkaListener(id = "sample-kafka", topics = "users")
public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
kafkaMessage.setKafkaID(id);
kafkaMessage.setContent(foo);
dataManager.commit(kafkaMessage);
}
我必须将 Kafka 集成到 Cuba,我认为这就像添加 spring kafka 依赖项并创建一个 Configuration
注释 class 来初始化自 Cuba 以来的 Kafka 消费者一样简单基于 Spring.
当我添加一个配置时,我发现它在古巴启动时没有被扫描。当我切换到 CUBA 视图时,我注意到只有那些被注释为 Service
或 Component
的 classes 会被读取。然而,即使我添加 Component
class,它仍然没有被正确扫描(我添加了一个用 @Value
注释的字段,它寻找一个不存在的 属性 但古巴启动时没有抛出任何错误)
CUBA+Kafka集成有一个简单的例子,你可以在这里找到:https://github.com/cuba-labs/kafka-sample
配置过程取自official Spring documentation。
- 关键配置class是com.company.kafkasample.config.KafkaConfig。它包含许多可帮助您配置 Kafka 设施的 bean。在此特定示例中,同时配置了生产者和消费者。请注意,配置参数是硬编码的,但这只是一个示例。
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
- 之后,您应该能够创建一个服务,通过注入 KafkaTemplate bean 将消息发送到 Kafka 队列。
@Inject
private KafkaTemplate<Integer, String> template;
@Override
public void sendMessage(String message) {
log.info("Sending {} using Kafka", message);
long id = uniqueNumbersService.getNextNumber("users");
ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Failed to send message {}, error {}", message, ex.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("Message {} sent", message);
}
});
}
- 然后您可以将此服务注入屏幕并在那里使用它。
- 至于接收者,你可以在CUBA组件中使用
@KafkaListener
注解来注解它的方法。例如下面的例子将kafka消息保存到数据库中。
@Component
@DependsOn("consumerFactory")
public class MessageListener {
@Inject
private DataManager dataManager;
@KafkaListener(id = "sample-kafka", topics = "users")
public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
kafkaMessage.setKafkaID(id);
kafkaMessage.setContent(foo);
dataManager.commit(kafkaMessage);
}