spring-kafka - 如何从头读一个主题,同时从头读另一个主题?
spring-kafka - how to read one topic from the beginning, while reading another one from the end?
我正在编写一个 spring-kafka 应用程序,其中我需要阅读 2 个主题:test1 和 test2:
public class Receiver {
private static final Logger LOGGER = LoggerFactory
.getLogger(Receiver.class);
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
@TopicPartition(topic = "test2", partitions = { "0" })})
public void receiveMessage(String message) {
LOGGER.info("received message='{}'", message);
}
}
我的配置如下所示:
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
return props;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
我需要能够只阅读来自 "test1" 的最新消息,同时能够阅读从 "test2" 最开始的所有消息。
我只对应用程序启动时的 "test2" 消息感兴趣,但只要应用程序处于 运行.
状态,就需要连续读取 "test1" 消息
有没有办法配置这样的功能?
这是一种对我有用的方法:
@KafkaListener(id = "receiver-api",
topicPartitions =
{ @TopicPartition(topic = "schema.topic",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
@TopicPartition(topic = "data.topic", partitions = { "0" })})
public void receiveMessage(String message) {
try {
JSONObject incomingJsonObject = new JSONObject(message);
if(!incomingJsonObject.isNull("data")){
handleSchemaMessage(incomingJsonObject);
}
else {
handleDataMessage(incomingJsonObject);
}
} catch (Exception e) {
e.printStackTrace();
}
使用"partitionOffsets"注释(import org.springframework.kafka.annotation.PartitionOffset;)
是始终能够从头阅读特定主题的关键,而 "tailing" 其他主题照常阅读。
我也曾为这个问题苦苦挣扎,想提出一个更通用的解决方案。
虽然您的解决方案有效,但您需要对分区进行硬编码。
您还可以让使用 @KafkaListener
注释的 class 实现 ConsumerSeekAware
接口。
这为您提供了三种可用于查找特定偏移量的方法。一旦分配了分区,就会调用一种方法。所以它可能看起来像这样。
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().stream()
.filter(partition -> "MYTOPIC".equals(partition.topic()))
.forEach(partition -> callback.seekToBeginning("MYTOPIC", partition.partition()));
}
这样,当您决定向主题添加更多分区时,您无需触及任何代码:)
希望这对某人有所帮助。
我正在编写一个 spring-kafka 应用程序,其中我需要阅读 2 个主题:test1 和 test2:
public class Receiver {
private static final Logger LOGGER = LoggerFactory
.getLogger(Receiver.class);
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
@TopicPartition(topic = "test2", partitions = { "0" })})
public void receiveMessage(String message) {
LOGGER.info("received message='{}'", message);
}
}
我的配置如下所示:
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
return props;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
我需要能够只阅读来自 "test1" 的最新消息,同时能够阅读从 "test2" 最开始的所有消息。 我只对应用程序启动时的 "test2" 消息感兴趣,但只要应用程序处于 运行.
状态,就需要连续读取 "test1" 消息有没有办法配置这样的功能?
这是一种对我有用的方法:
@KafkaListener(id = "receiver-api",
topicPartitions =
{ @TopicPartition(topic = "schema.topic",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
@TopicPartition(topic = "data.topic", partitions = { "0" })})
public void receiveMessage(String message) {
try {
JSONObject incomingJsonObject = new JSONObject(message);
if(!incomingJsonObject.isNull("data")){
handleSchemaMessage(incomingJsonObject);
}
else {
handleDataMessage(incomingJsonObject);
}
} catch (Exception e) {
e.printStackTrace();
}
使用"partitionOffsets"注释(import org.springframework.kafka.annotation.PartitionOffset;)
是始终能够从头阅读特定主题的关键,而 "tailing" 其他主题照常阅读。
我也曾为这个问题苦苦挣扎,想提出一个更通用的解决方案。
虽然您的解决方案有效,但您需要对分区进行硬编码。
您还可以让使用 @KafkaListener
注释的 class 实现 ConsumerSeekAware
接口。
这为您提供了三种可用于查找特定偏移量的方法。一旦分配了分区,就会调用一种方法。所以它可能看起来像这样。
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().stream()
.filter(partition -> "MYTOPIC".equals(partition.topic()))
.forEach(partition -> callback.seekToBeginning("MYTOPIC", partition.partition()));
}
这样,当您决定向主题添加更多分区时,您无需触及任何代码:)
希望这对某人有所帮助。