连接到多个集群 spring kafka
Connecting to multiple clusters spring kafka
我想从一个kafka集群消费消息并发布到另一个kafka集群。想知道如何使用 spring-kafka 进行配置吗?
只需配置具有不同 bootstrap.servers
属性的消费者和生产者工厂。
如果您使用的是 Spring 引导,请参阅
和
如果您要创建自己的工厂 @Bean
s,请在那里设置属性。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting
你可以使用 spring 云流卡夫卡活页夹。
创建两个流,一个用于消费,一个用于生产。
消费者
public interface InputStreamExample{
String INPUT = "consumer-in";
@Input(INPUT)
MessageChannel readFromKafka();
}
制作人
public interface ProducerStreamExample{
String OUTPUT = "produce-out";
@Output(OUTPUT)
MessageChannel produceToKafka();
}
消费消息用途:
@StreamListener(value = ItemStream.INPUT)
public void processMessage(){
/*
code goes here
*/
}
用于制作
//here producerStreamExample is instance of ProducerStreamExample
producerStreamExample.produceToKafka().send(/*message goes here*/);
现在使用binder配置consumer和producer集群,消费者集群可以使用consumer-in,生产集群可以使用produce-out。
属性文件
spring.cloud.stream.binders.kafka-a.environment.spring.cloud.stream.kafka.binder.brokers:<consumer cluster>
#other properties for this binders
#bind kafka-a to consumer-in
spring.cloud.stream.bindings.consumer-in.binder=kafka-a #kafka-a binding to consumer-in
#similary other properties of consumer-in, like
spring.cloud.stream.bindings.consumer-in.destination=<topic>
spring.cloud.stream.bindings.consumer-in.group=<consumer group>
#now configure cluster to produce
spring.cloud.stream.binders.kafka-b.environment.spring.cloud.stream.kafka.binder.brokers:<cluster where to produce>
spring.cloud.stream.bindings.produce-out.binder=kafka-b #here kafka-b, binding to produce-out
#similary you can do other configuration like topic
spring.cloud.stream.bindings.produce-out.destination=<topic>
更多配置参考这里:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html
我想从一个kafka集群消费消息并发布到另一个kafka集群。想知道如何使用 spring-kafka 进行配置吗?
只需配置具有不同 bootstrap.servers
属性的消费者和生产者工厂。
如果您使用的是 Spring 引导,请参阅
和
如果您要创建自己的工厂 @Bean
s,请在那里设置属性。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting
你可以使用 spring 云流卡夫卡活页夹。
创建两个流,一个用于消费,一个用于生产。
消费者
public interface InputStreamExample{
String INPUT = "consumer-in";
@Input(INPUT)
MessageChannel readFromKafka();
}
制作人
public interface ProducerStreamExample{
String OUTPUT = "produce-out";
@Output(OUTPUT)
MessageChannel produceToKafka();
}
消费消息用途:
@StreamListener(value = ItemStream.INPUT)
public void processMessage(){
/*
code goes here
*/
}
用于制作
//here producerStreamExample is instance of ProducerStreamExample
producerStreamExample.produceToKafka().send(/*message goes here*/);
现在使用binder配置consumer和producer集群,消费者集群可以使用consumer-in,生产集群可以使用produce-out。
属性文件
spring.cloud.stream.binders.kafka-a.environment.spring.cloud.stream.kafka.binder.brokers:<consumer cluster>
#other properties for this binders
#bind kafka-a to consumer-in
spring.cloud.stream.bindings.consumer-in.binder=kafka-a #kafka-a binding to consumer-in
#similary other properties of consumer-in, like
spring.cloud.stream.bindings.consumer-in.destination=<topic>
spring.cloud.stream.bindings.consumer-in.group=<consumer group>
#now configure cluster to produce
spring.cloud.stream.binders.kafka-b.environment.spring.cloud.stream.kafka.binder.brokers:<cluster where to produce>
spring.cloud.stream.bindings.produce-out.binder=kafka-b #here kafka-b, binding to produce-out
#similary you can do other configuration like topic
spring.cloud.stream.bindings.produce-out.destination=<topic>
更多配置参考这里:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html