不同的消费者有各自的过滤策略
Different consumers with the respective filter strategies
我们可以在 spring 引导应用程序中让 2 个不同的消费者使用各自的过滤策略吗?如果是这样,我们如何将过滤策略挂钩到各自的消费者? (boot-2.3.8) 注意:两者属性不同
@KafkaListener(topics = "topic1", groupId = "group-id1",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic1() {
return rec -> true;
}
@KafkaListener(topics = "topic2", groupId = "group-id2",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic2() {
return rec -> true;
}
目前无法在侦听器级别覆盖过滤策略;您将不得不创建两个容器工厂。欢迎在 GitHub 问题中提出新功能请求。 https://github.com/spring-projects/spring-kafka/issues
一个快速的解决方案是编写一个包装器策略,根据 ConsumerRecord
中的主题名称委托给一个或另一个过滤器。
return rec -> rec.topic().equals("topic1")
? recordFilterStrategyForTopic1.filter(rec)
: recordFilterStrategyForTopic2.filter(rec);
编辑
这是一个完整的例子:
@SpringBootApplication
public class So67391819Application {
public static void main(String[] args) {
SpringApplication.run(So67391819Application.class, args);
}
@KafkaListener(id = "so67391819-1", topics = "so67391819-1")
void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener1 " + in + "@" + offset);
}
@KafkaListener(id = "so67391819-2", topics = "so67391819-2")
void listen2(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener2 " + in + "@" + offset);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so67391819-1").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so67391819-2").partitions(1).replicas(1).build();
}
@Bean
RecordFilterStrategy<String, String> f1() {
return rec -> rec.offset() % 2 == 0;
}
@Bean
RecordFilterStrategy<String, String> f2() {
return rec -> rec.offset() % 2 == 1;
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
template.send("so67391819-1", "foo" + i);
template.send("so67391819-2", "bar" + i);
});
};
}
}
@Component
class FactoryCustomizer {
FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
RecordFilterStrategy<String, String> f1,
RecordFilterStrategy<String, String> f2) {
factory.setRecordFilterStrategy(rec -> rec.topic().endsWith("-1") ? f1.filter(rec) : f2.filter(rec));
}
}
listener1 foo1@1
listener2 bar0@0
listener1 foo3@3
listener1 foo5@5
listener2 bar2@2
listener1 foo7@7
listener2 bar4@4
listener1 foo9@9
listener2 bar6@6
listener2 bar8@8
我们可以在 spring 引导应用程序中让 2 个不同的消费者使用各自的过滤策略吗?如果是这样,我们如何将过滤策略挂钩到各自的消费者? (boot-2.3.8) 注意:两者属性不同
@KafkaListener(topics = "topic1", groupId = "group-id1",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic1() {
return rec -> true;
}
@KafkaListener(topics = "topic2", groupId = "group-id2",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic2() {
return rec -> true;
}
目前无法在侦听器级别覆盖过滤策略;您将不得不创建两个容器工厂。欢迎在 GitHub 问题中提出新功能请求。 https://github.com/spring-projects/spring-kafka/issues
一个快速的解决方案是编写一个包装器策略,根据 ConsumerRecord
中的主题名称委托给一个或另一个过滤器。
return rec -> rec.topic().equals("topic1")
? recordFilterStrategyForTopic1.filter(rec)
: recordFilterStrategyForTopic2.filter(rec);
编辑
这是一个完整的例子:
@SpringBootApplication
public class So67391819Application {
public static void main(String[] args) {
SpringApplication.run(So67391819Application.class, args);
}
@KafkaListener(id = "so67391819-1", topics = "so67391819-1")
void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener1 " + in + "@" + offset);
}
@KafkaListener(id = "so67391819-2", topics = "so67391819-2")
void listen2(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener2 " + in + "@" + offset);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so67391819-1").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so67391819-2").partitions(1).replicas(1).build();
}
@Bean
RecordFilterStrategy<String, String> f1() {
return rec -> rec.offset() % 2 == 0;
}
@Bean
RecordFilterStrategy<String, String> f2() {
return rec -> rec.offset() % 2 == 1;
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
template.send("so67391819-1", "foo" + i);
template.send("so67391819-2", "bar" + i);
});
};
}
}
@Component
class FactoryCustomizer {
FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
RecordFilterStrategy<String, String> f1,
RecordFilterStrategy<String, String> f2) {
factory.setRecordFilterStrategy(rec -> rec.topic().endsWith("-1") ? f1.filter(rec) : f2.filter(rec));
}
}
listener1 foo1@1
listener2 bar0@0
listener1 foo3@3
listener1 foo5@5
listener2 bar2@2
listener1 foo7@7
listener2 bar4@4
listener1 foo9@9
listener2 bar6@6
listener2 bar8@8