spring 启动应用程序以将 kafka 与活动 mq 集成
spring boot app to integrate kafka with active mq
我正在尝试构建一个 spring 启动应用程序,它从 kafka 读取消息并将它们放入 activeMQ
反之亦然(从 activeMQ 读取并写入 kafka)
我没有找到任何有用的教程来快速启动我的项目
参见 Spring Integration and the Spring Integration Extension for Apache Kafka。
使用入站和出站通道适配器
jms -> kafka
kafka -> jms
Kafka Connect在这方面也有一些能力space,但我不熟悉
编辑
这个简单的 Spring 启动应用程序显示了从 Kafka 到 RabbitMQ 的数据传输,反之亦然:
package com.example.demo;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class So61069735Application {
public static void main(String[] args) {
SpringApplication.run(So61069735Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public ApplicationRunner toKafka() {
return args -> this.kafkaTemplate.send("so61069735-1", "foo");
}
@KafkaListener(id = "so61069735-1", topics = "so61069735-1")
public void listen1(String in) {
System.out.println("From Kafka: " + in);
this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
}
@RabbitListener(queues = "so61069735-2")
public void listen2(String in) {
System.out.println("From Rabbit: " + in);
this.kafkaTemplate.send("so61069735-3", in + in);
}
@KafkaListener(id = "so61069735-3", topics = "so61069735-3")
public void listen(String in) {
System.out.println("Final: " + in);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("so61069735-2").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.auto-offset-reset=earliest
结果
From Kafka: foo
From Rabbit: FOO
Final: FOOFOO
我正在尝试构建一个 spring 启动应用程序,它从 kafka 读取消息并将它们放入 activeMQ 反之亦然(从 activeMQ 读取并写入 kafka) 我没有找到任何有用的教程来快速启动我的项目
参见 Spring Integration and the Spring Integration Extension for Apache Kafka。
使用入站和出站通道适配器
jms -> kafka
kafka -> jms
Kafka Connect在这方面也有一些能力space,但我不熟悉
编辑
这个简单的 Spring 启动应用程序显示了从 Kafka 到 RabbitMQ 的数据传输,反之亦然:
package com.example.demo;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class So61069735Application {
public static void main(String[] args) {
SpringApplication.run(So61069735Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public ApplicationRunner toKafka() {
return args -> this.kafkaTemplate.send("so61069735-1", "foo");
}
@KafkaListener(id = "so61069735-1", topics = "so61069735-1")
public void listen1(String in) {
System.out.println("From Kafka: " + in);
this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
}
@RabbitListener(queues = "so61069735-2")
public void listen2(String in) {
System.out.println("From Rabbit: " + in);
this.kafkaTemplate.send("so61069735-3", in + in);
}
@KafkaListener(id = "so61069735-3", topics = "so61069735-3")
public void listen(String in) {
System.out.println("Final: " + in);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("so61069735-2").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.auto-offset-reset=earliest
结果
From Kafka: foo
From Rabbit: FOO
Final: FOOFOO