在 PCF 中自动缩放应用程序时 Kafka 回复超时
Kafka Reply Time out when applications are auto scaled in PCF
我正在使用 ReplyingKafkaTemplate 进行 Kafka 同步响应,并且只有一次实例 运行ning 时我才能得到响应。但是,如果应用程序扩展到多个实例,我会收到超时错误。
来自文档
When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply.
根据文档,如果我们需要使用不同的消费者组,这是否意味着我们需要手动 运行 具有不同消费者组的实例?如果我们使用像 PCF 这样的工具,我们如何处理自动缩放。下面是我的kafka配置。
@Configuration
@EnableKafka
public class KafkaConfig {
//My Properties
@Bean
public Map < String, Object > producerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public Map < String, Object > consumerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
return props;
}
@Bean
public ProducerFactory < String, String > producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig());
}
@Bean
public ConsumerFactory < String, String > consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfig());
}
@Bean
public KafkaTemplate < String, String > kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
(ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
rkt.setDefaultReplyTimeout(Duration.ofMillis(slaTime));
rkt.setSharedReplyTopic(true);
return rkt;
}
@Bean
public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return new KafkaMessageListenerContainer < > (cf, containerProperties);
}
@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
return factory;
}
}
在replyContainer
bean中,添加
containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
containerProperties.setKafkaConsumerProperties(props);
在replyingKafkaTemplate
中添加
rkt.setSharedReplyTopic(true);
请求主题至少需要与最大值 scale-out 一样多的分区。
回复主题可以有任意数量的分区(包括1)。
使用 PCF,您可以使用 instanceIndex
构造 groupId 而不是使其随机。
您也可以将 instanceIndex
用作 REPLY_PARTITION
header 并使用固定回复分区;在这种情况下,您至少需要与您希望使用的最大值 instanceIndex
一样多的分区。
我正在使用 ReplyingKafkaTemplate 进行 Kafka 同步响应,并且只有一次实例 运行ning 时我才能得到响应。但是,如果应用程序扩展到多个实例,我会收到超时错误。
来自文档
When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply.
根据文档,如果我们需要使用不同的消费者组,这是否意味着我们需要手动 运行 具有不同消费者组的实例?如果我们使用像 PCF 这样的工具,我们如何处理自动缩放。下面是我的kafka配置。
@Configuration
@EnableKafka
public class KafkaConfig {
//My Properties
@Bean
public Map < String, Object > producerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public Map < String, Object > consumerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
return props;
}
@Bean
public ProducerFactory < String, String > producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig());
}
@Bean
public ConsumerFactory < String, String > consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfig());
}
@Bean
public KafkaTemplate < String, String > kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
(ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
rkt.setDefaultReplyTimeout(Duration.ofMillis(slaTime));
rkt.setSharedReplyTopic(true);
return rkt;
}
@Bean
public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return new KafkaMessageListenerContainer < > (cf, containerProperties);
}
@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
return factory;
}
}
在replyContainer
bean中,添加
containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
containerProperties.setKafkaConsumerProperties(props);
在replyingKafkaTemplate
中添加
rkt.setSharedReplyTopic(true);
请求主题至少需要与最大值 scale-out 一样多的分区。 回复主题可以有任意数量的分区(包括1)。
使用 PCF,您可以使用 instanceIndex
构造 groupId 而不是使其随机。
您也可以将 instanceIndex
用作 REPLY_PARTITION
header 并使用固定回复分区;在这种情况下,您至少需要与您希望使用的最大值 instanceIndex
一样多的分区。