连接到多个集群 spring kafka

Connecting to multiple clusters spring kafka

我想从一个kafka集群消费消息并发布到另一个kafka集群。想知道如何使用 spring-kafka 进行配置吗?

只需配置具有不同 bootstrap.servers 属性的消费者和生产者工厂。

如果您使用的是 Spring 引导,请参阅

https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.kafka.consumer.bootstrap-servers

https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.kafka.producer.bootstrap-servers

如果您要创建自己的工厂 @Beans,请在那里设置属性。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting

你可以使用 spring 云流卡夫卡活页夹。

创建两个流,一个用于消费,一个用于生产。

消费者

public interface InputStreamExample{
    String INPUT = "consumer-in";
    @Input(INPUT)
    MessageChannel readFromKafka();
}

制作人

public interface ProducerStreamExample{

    String OUTPUT = "produce-out";

    @Output(OUTPUT)
    MessageChannel produceToKafka();
}

消费消息用途:

@StreamListener(value = ItemStream.INPUT)
public void processMessage(){
    /*
    code goes here
    */
}

用于制作

//here producerStreamExample is instance of ProducerStreamExample 
producerStreamExample.produceToKafka().send(/*message goes here*/);

现在使用binder配置consumer和producer集群,消费者集群可以使用consumer-in,生产集群可以使用produce-out。

属性文件

spring.cloud.stream.binders.kafka-a.environment.spring.cloud.stream.kafka.binder.brokers:<consumer cluster>
#other properties for this binders

#bind kafka-a to consumer-in
spring.cloud.stream.bindings.consumer-in.binder=kafka-a  #kafka-a binding to consumer-in
#similary other properties of consumer-in, like
spring.cloud.stream.bindings.consumer-in.destination=<topic>
spring.cloud.stream.bindings.consumer-in.group=<consumer group>


#now configure cluster to produce 

spring.cloud.stream.binders.kafka-b.environment.spring.cloud.stream.kafka.binder.brokers:<cluster where to produce>


spring.cloud.stream.bindings.produce-out.binder=kafka-b    #here kafka-b, binding to produce-out
#similary you can do other configuration like topic
spring.cloud.stream.bindings.produce-out.destination=<topic>

更多配置参考这里:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html