spring 启动应用程序以将 kafka 与活动 mq 集成

spring boot app to integrate kafka with active mq

我正在尝试构建一个 spring 启动应用程序,它从 kafka 读取消息并将它们放入 activeMQ 反之亦然(从 activeMQ 读取并写入 kafka) 我没有找到任何有用的教程来快速启动我的项目

参见 Spring Integration and the Spring Integration Extension for Apache Kafka

使用入站和出站通道适配器

jms -> kafka

kafka -> jms

Kafka Connect在这方面也有一些能力space,但我不熟悉

编辑

这个简单的 Spring 启动应用程序显示了从 Kafka 到 RabbitMQ 的数据传输,反之亦然:

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class So61069735Application {

    public static void main(String[] args) {
        SpringApplication.run(So61069735Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public ApplicationRunner toKafka() {
        return args -> this.kafkaTemplate.send("so61069735-1", "foo");
    }

    @KafkaListener(id = "so61069735-1", topics = "so61069735-1")
    public void listen1(String in) {
        System.out.println("From Kafka: " + in);
        this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
    }

    @RabbitListener(queues = "so61069735-2")
    public void listen2(String in) {
        System.out.println("From Rabbit: " + in);
        this.kafkaTemplate.send("so61069735-3", in + in);
    }

    @KafkaListener(id = "so61069735-3", topics = "so61069735-3")
    public void listen(String in) {
        System.out.println("Final: " + in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("so61069735-2").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

结果

From Kafka: foo
From Rabbit: FOO
Final: FOOFOO