古巴平台 - Spring Kafka 集成

Cuba Platform - Spring Kafka Integration

我必须将 Kafka 集成到 Cuba,我认为这就像添加 spring kafka 依赖项并创建一个 Configuration 注释 class 来初始化自 Cuba 以来的 Kafka 消费者一样简单基于 Spring.

当我添加一个配置时,我发现它在古巴启动时没有被扫描。当我切换到 CUBA 视图时,我注意到只有那些被注释为 ServiceComponent 的 classes 会被读取。然而,即使我添加 Component class,它仍然没有被正确扫描(我添加了一个用 @Value 注释的字段,它寻找一个不存在的 属性 但古巴启动时没有抛出任何错误)

CUBA+Kafka集成有一个简单的例子,你可以在这里找到:https://github.com/cuba-labs/kafka-sample

配置过程取自official Spring documentation

  1. 关键配置class是com.company.kafkasample.config.KafkaConfig。它包含许多可帮助您配置 Kafka 设施的 bean。在此特定示例中,同时配置了生产者和消费者。请注意,配置参数是硬编码的,但这只是一个示例。
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
  1. 之后,您应该能够创建一个服务,通过注入 KafkaTemplate bean 将消息发送到 Kafka 队列。
    @Inject
    private KafkaTemplate<Integer, String> template;


    @Override
    public void sendMessage(String message) {
        log.info("Sending {} using Kafka", message);
        long id = uniqueNumbersService.getNextNumber("users");
        ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
        send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
               log.info("Failed to send message {}, error {}", message, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                log.info("Message {} sent", message);
            }
        });
    }
  1. 然后您可以将此服务注入屏幕并在那里使用它。
  2. 至于接收者,你可以在CUBA组件中使用@KafkaListener注解来注解它的方法。例如下面的例子将kafka消息保存到数据库中。
@Component
@DependsOn("consumerFactory")
public class MessageListener {

    @Inject
    private DataManager dataManager;

    @KafkaListener(id = "sample-kafka", topics = "users")
    public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
        KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
        kafkaMessage.setKafkaID(id);
        kafkaMessage.setContent(foo);
        dataManager.commit(kafkaMessage);
    }