反应式 Kafka 项目中的多个 Kafka 配置
Multiple Kafka configurations in a reactive Kafka project
我正在做一个有 2 项服务的项目:读取、转换消息然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
sourcetopic1: topic1
destinationtopic1 : topic2
sourcetopic2: topic3
destinationtopic2 : topic4
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestCollector
client-id:TestCollector01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
这些是我的两个服务的配置文件:
Service1KafkaConfig
public class KafkaConfig {
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
}
Service2Config
public class Service2KafkaConfig {
@Bean
public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
}
我在各自的服务中自动装配这些 bean:
Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要这样做。如果需要请告诉我。
@Slf4j
@Service
public class Service1 implements CommandLineRunner {
@Autowired
public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;
public Flux<String> consume1() {
return service1KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.donOnNext(s->ProcessRecord(s))
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume1().subscribe();
}
}
服务 2:
@Slf4j
@Service
public class Service2 implements CommandLineRunner {
@Autowired
@Qualifier("service2KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;
public Flux<String> consume2() {
return service2KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume2().subscribe();
}
}
当我 运行 应用程序时,我只能看到一个消费者开始订阅 topic1。是否可以在同一个项目中有多个 Kafka 消费者 运行ning。如果是的话,你能告诉我需要做什么才能让他们 运行ning 吗?
看起来他们都在使用相同的主题和配置;如果只有一个分区,并且它们在同一个消费者组中,则只有其中一个会获得任何数据。
如果你想让他们都得到相同的数据,你必须把他们放在不同的消费者组中。
这按预期工作:
@Bean
public ApplicationRunner runner1() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group1",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("one: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
@Bean
public ApplicationRunner runner2() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group2",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("two: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
one: foo@16
two: foo@16
我正在做一个有 2 项服务的项目:读取、转换消息然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
sourcetopic1: topic1
destinationtopic1 : topic2
sourcetopic2: topic3
destinationtopic2 : topic4
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestCollector
client-id:TestCollector01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
这些是我的两个服务的配置文件:
Service1KafkaConfig
public class KafkaConfig { @Bean public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }
}
Service2Config
public class Service2KafkaConfig { @Bean public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }
}
我在各自的服务中自动装配这些 bean:
Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要这样做。如果需要请告诉我。
@Slf4j
@Service
public class Service1 implements CommandLineRunner {
@Autowired
public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;
public Flux<String> consume1() {
return service1KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.donOnNext(s->ProcessRecord(s))
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume1().subscribe();
}
}
服务 2:
@Slf4j
@Service
public class Service2 implements CommandLineRunner {
@Autowired
@Qualifier("service2KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;
public Flux<String> consume2() {
return service2KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume2().subscribe();
}
}
当我 运行 应用程序时,我只能看到一个消费者开始订阅 topic1。是否可以在同一个项目中有多个 Kafka 消费者 运行ning。如果是的话,你能告诉我需要做什么才能让他们 运行ning 吗?
看起来他们都在使用相同的主题和配置;如果只有一个分区,并且它们在同一个消费者组中,则只有其中一个会获得任何数据。
如果你想让他们都得到相同的数据,你必须把他们放在不同的消费者组中。
这按预期工作:
@Bean
public ApplicationRunner runner1() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group1",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("one: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
@Bean
public ApplicationRunner runner2() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group2",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("two: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
one: foo@16
two: foo@16