Google Cloud PubSub 将消息发送给多个消费者(在同一订阅中)
Google Cloud PubSub send the message to more than one consumer (in the same subscription)
我有一个 Java SpringBoot2 应用程序 (app1),它向 Google Cloud PubSub 主题(它是发布者)发送消息。
其他 Java SpringBoot2 应用程序 (app2) 订阅了订阅以接收这些消息。但在这种情况下,我有不止一个实例(启用了 k8s 自动缩放),所以我有不止一个用于此应用程序的 pod 消费来自 PubSub 的消息。
一些消息被 app2 的一个实例使用,但许多其他消息被发送到多个 app2 实例,因此这些消息的消息处理是重复的。
消费者(app2)的代码如下:
private final static int ACK_DEAD_LINE_IN_SECONDS = 30;
private static final long POLLING_PERIOD_MS = 250L;
private static final int WINDOW_MAX_SIZE = 1000;
private static final Duration WINDOW_MAX_TIME = Duration.ofSeconds(1L);
@Autowired
private PubSubAdmin pubSubAdmin;
@Bean
public ApplicationRunner runner(PubSubReactiveFactory reactiveFactory) {
return args -> {
createSubscription("subscription-id", "topic-id", ACK_DEAD_LINE_IN_SECONDS);
reactiveFactory.poll(subscription, POLLING_PERIOD_MS) // Poll the PubSub periodically
.map(msg -> Pair.of(msg, getMessageValue(msg))) // Extract the message as a pair
.bufferTimeout(WINDOW_MAX_SIZE, WINDOW_MAX_TIME) // Create a buffer of messages to bulk process
.flatMap(this::processBuffer) // Process the buffer
.doOnError(e -> log.error("Error processing event window", e))
.retry()
.subscribe();
};
}
private void createSubscription(String subscriptionName, String topicName, int ackDeadline) {
pubSubAdmin.createTopic(topicName);
try {
pubSubAdmin.createSubscription(subscriptionName, topicName, ackDeadline);
} catch (AlreadyExistsException e) {
log.info("Pubsub subscription '{}' already configured for topic '{}': {}", subscriptionName, topicName, e.getMessage());
}
}
private Flux<Void> processBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> msgsWindow) {
return Flux.fromStream(
msgsWindow.stream()
.collect(Collectors.groupingBy(msg -> msg.getRight().getData())) // Group the messages by same data
.values()
.stream()
)
.flatMap(this::processDataBuffer);
}
private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
return processData(
dataMsgsWindow.get(0).getRight().getData(),
dataMsgsWindow.stream()
.map(Pair::getRight)
.map(PreparedRecordEvent::getRecord)
.collect(Collectors.toSet())
)
.doOnSuccess(it ->
dataMsgsWindow.forEach(msg -> {
log.info("Mark msg ACK");
msg.getLeft().ack();
})
)
.doOnError(e -> {
log.error("Error on PreparedRecordEvent event", e);
dataMsgsWindow.forEach(msg -> {
log.error("Mark msg NACK");
msg.getLeft().nack();
});
})
.retry();
}
private Mono<Void> processData(Data data, Set<Record> records) {
// For each message, make calculations over the records associated to the data
final DataQuality calculated = calculatorService.calculateDataQualityFor(data, records); // Arithmetic calculations
return this.daasClient.updateMetrics(calculated) // Update DB record with a DaaS to wrap DB access
.flatMap(it -> {
if (it.getProcessedRows() >= it.getValidRows()) {
return finish(data);
}
return Mono.just(data);
})
.then();
}
private Mono<Data> finish(Data data) {
return dataClient.updateStatus(data.getId, DataStatus.DONE) // Update DB record with a DaaS to wrap DB access
.doOnSuccess(updatedData -> pubSubClient.publish(
new Qa0DonedataEvent(updatedData) // Publis a new event in other topic
))
.doOnError(err -> {
log.error("Error finishing data");
})
.onErrorReturn(data);
}
我需要每条消息都由一个且仅一个 app2 实例使用。有人知道这是否可能吗?有实现此目标的想法吗?
也许正确的方法是为每个 app2 实例创建一个订阅,并配置主题以将每条消息发送到一个订阅而不是每个订阅。有可能吗?
根据 official documentation,一旦将消息发送给订阅者,Pub/Sub 会尝试不将其传递给同一订阅上的任何其他订阅者(app2 实例是同一订阅的订阅者):
Once a message is sent to a subscriber, the subscriber should
acknowledge the message. A message is considered outstanding once it
has been sent out for delivery and before a subscriber acknowledges
it. Pub/Sub will repeatedly attempt to deliver any message that has
not been acknowledged. While a message is outstanding to a subscriber,
however, Pub/Sub tries not to deliver it to any other subscriber on
the same subscription. The subscriber has a configurable, limited
amount of time -- known as the ackDeadline -- to acknowledge the
outstanding message. Once the deadline passes, the message is no
longer considered outstanding, and Pub/Sub will attempt to redeliver
the message
一般来说,云 Pub/Sub 具有至少一次交付语义。这意味着可以重新传递已经被确认的消息,并且可以让传递给多个订阅者的消息接收相同的订阅消息。对于行为良好的订阅者来说,这两种情况应该相对少见,但如果不跟踪所有订阅者传递的所有消息的 ID,将无法保证不会重复。
如果它以某种频率发生,最好检查一下您的消息是否在确认截止日期内得到确认。您正在缓冲 1 秒的消息,与 30 秒的确认截止日期相比应该相对较小,但这也取决于消息最终需要多长时间来处理。例如,如果缓冲区按顺序处理,则可能是您的 1000 条消息缓冲区中较晚的消息没有得到及时处理。你可以看看 subscription/expired_ack_deadlines_count
metric in Cloud Monitoring to determine if it is indeed the case that your acks for messages are late. Note that late acks for even a small number of messages could result in more duplicates. See the "Message Redelivery & Duplication Rate" section of the Fine-tuning Pub/Sub performance with batch and flow control settings post.
好的,经过测试、阅读文档和审查代码,我发现其中有一个“小”错误。
我们在“processDataBuffer”方法上有一个错误的“重试”,所以当错误发生时,缓冲区中的消息被标记为NACK,因此它们被传递给另一个实例,但由于重试,它们被再次执行,正确的,所以消息也被标记为ACK。
为此,其中一些人被两次起诉。
private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
return processData(
dataMsgsWindow.get(0).getRight().getData(),
dataMsgsWindow.stream()
.map(Pair::getRight)
.map(PreparedRecordEvent::getRecord)
.collect(Collectors.toSet())
)
.doOnSuccess(it ->
dataMsgsWindow.forEach(msg -> {
log.info("Mark msg ACK");
msg.getLeft().ack();
})
)
.doOnError(e -> {
log.error("Error on PreparedRecordEvent event", e);
dataMsgsWindow.forEach(msg -> {
log.error("Mark msg NACK");
msg.getLeft().nack();
});
})
.retry(); // this retry has been deleted
}
我的问题已经解决了。
修正上述错误后,我仍然收到重复的消息。当您使用缓冲区或 windows 时,Google Cloud 的 PubSub 不保证“恰好一个交付”是公认的。这正是我的场景,所以我必须实现一种机制来根据消息 ID 删除重复项。
我有一个 Java SpringBoot2 应用程序 (app1),它向 Google Cloud PubSub 主题(它是发布者)发送消息。
其他 Java SpringBoot2 应用程序 (app2) 订阅了订阅以接收这些消息。但在这种情况下,我有不止一个实例(启用了 k8s 自动缩放),所以我有不止一个用于此应用程序的 pod 消费来自 PubSub 的消息。
一些消息被 app2 的一个实例使用,但许多其他消息被发送到多个 app2 实例,因此这些消息的消息处理是重复的。
消费者(app2)的代码如下:
private final static int ACK_DEAD_LINE_IN_SECONDS = 30;
private static final long POLLING_PERIOD_MS = 250L;
private static final int WINDOW_MAX_SIZE = 1000;
private static final Duration WINDOW_MAX_TIME = Duration.ofSeconds(1L);
@Autowired
private PubSubAdmin pubSubAdmin;
@Bean
public ApplicationRunner runner(PubSubReactiveFactory reactiveFactory) {
return args -> {
createSubscription("subscription-id", "topic-id", ACK_DEAD_LINE_IN_SECONDS);
reactiveFactory.poll(subscription, POLLING_PERIOD_MS) // Poll the PubSub periodically
.map(msg -> Pair.of(msg, getMessageValue(msg))) // Extract the message as a pair
.bufferTimeout(WINDOW_MAX_SIZE, WINDOW_MAX_TIME) // Create a buffer of messages to bulk process
.flatMap(this::processBuffer) // Process the buffer
.doOnError(e -> log.error("Error processing event window", e))
.retry()
.subscribe();
};
}
private void createSubscription(String subscriptionName, String topicName, int ackDeadline) {
pubSubAdmin.createTopic(topicName);
try {
pubSubAdmin.createSubscription(subscriptionName, topicName, ackDeadline);
} catch (AlreadyExistsException e) {
log.info("Pubsub subscription '{}' already configured for topic '{}': {}", subscriptionName, topicName, e.getMessage());
}
}
private Flux<Void> processBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> msgsWindow) {
return Flux.fromStream(
msgsWindow.stream()
.collect(Collectors.groupingBy(msg -> msg.getRight().getData())) // Group the messages by same data
.values()
.stream()
)
.flatMap(this::processDataBuffer);
}
private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
return processData(
dataMsgsWindow.get(0).getRight().getData(),
dataMsgsWindow.stream()
.map(Pair::getRight)
.map(PreparedRecordEvent::getRecord)
.collect(Collectors.toSet())
)
.doOnSuccess(it ->
dataMsgsWindow.forEach(msg -> {
log.info("Mark msg ACK");
msg.getLeft().ack();
})
)
.doOnError(e -> {
log.error("Error on PreparedRecordEvent event", e);
dataMsgsWindow.forEach(msg -> {
log.error("Mark msg NACK");
msg.getLeft().nack();
});
})
.retry();
}
private Mono<Void> processData(Data data, Set<Record> records) {
// For each message, make calculations over the records associated to the data
final DataQuality calculated = calculatorService.calculateDataQualityFor(data, records); // Arithmetic calculations
return this.daasClient.updateMetrics(calculated) // Update DB record with a DaaS to wrap DB access
.flatMap(it -> {
if (it.getProcessedRows() >= it.getValidRows()) {
return finish(data);
}
return Mono.just(data);
})
.then();
}
private Mono<Data> finish(Data data) {
return dataClient.updateStatus(data.getId, DataStatus.DONE) // Update DB record with a DaaS to wrap DB access
.doOnSuccess(updatedData -> pubSubClient.publish(
new Qa0DonedataEvent(updatedData) // Publis a new event in other topic
))
.doOnError(err -> {
log.error("Error finishing data");
})
.onErrorReturn(data);
}
我需要每条消息都由一个且仅一个 app2 实例使用。有人知道这是否可能吗?有实现此目标的想法吗?
也许正确的方法是为每个 app2 实例创建一个订阅,并配置主题以将每条消息发送到一个订阅而不是每个订阅。有可能吗?
根据 official documentation,一旦将消息发送给订阅者,Pub/Sub 会尝试不将其传递给同一订阅上的任何其他订阅者(app2 实例是同一订阅的订阅者):
Once a message is sent to a subscriber, the subscriber should acknowledge the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. While a message is outstanding to a subscriber, however, Pub/Sub tries not to deliver it to any other subscriber on the same subscription. The subscriber has a configurable, limited amount of time -- known as the ackDeadline -- to acknowledge the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and Pub/Sub will attempt to redeliver the message
一般来说,云 Pub/Sub 具有至少一次交付语义。这意味着可以重新传递已经被确认的消息,并且可以让传递给多个订阅者的消息接收相同的订阅消息。对于行为良好的订阅者来说,这两种情况应该相对少见,但如果不跟踪所有订阅者传递的所有消息的 ID,将无法保证不会重复。
如果它以某种频率发生,最好检查一下您的消息是否在确认截止日期内得到确认。您正在缓冲 1 秒的消息,与 30 秒的确认截止日期相比应该相对较小,但这也取决于消息最终需要多长时间来处理。例如,如果缓冲区按顺序处理,则可能是您的 1000 条消息缓冲区中较晚的消息没有得到及时处理。你可以看看 subscription/expired_ack_deadlines_count
metric in Cloud Monitoring to determine if it is indeed the case that your acks for messages are late. Note that late acks for even a small number of messages could result in more duplicates. See the "Message Redelivery & Duplication Rate" section of the Fine-tuning Pub/Sub performance with batch and flow control settings post.
好的,经过测试、阅读文档和审查代码,我发现其中有一个“小”错误。 我们在“processDataBuffer”方法上有一个错误的“重试”,所以当错误发生时,缓冲区中的消息被标记为NACK,因此它们被传递给另一个实例,但由于重试,它们被再次执行,正确的,所以消息也被标记为ACK。 为此,其中一些人被两次起诉。
private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
return processData(
dataMsgsWindow.get(0).getRight().getData(),
dataMsgsWindow.stream()
.map(Pair::getRight)
.map(PreparedRecordEvent::getRecord)
.collect(Collectors.toSet())
)
.doOnSuccess(it ->
dataMsgsWindow.forEach(msg -> {
log.info("Mark msg ACK");
msg.getLeft().ack();
})
)
.doOnError(e -> {
log.error("Error on PreparedRecordEvent event", e);
dataMsgsWindow.forEach(msg -> {
log.error("Mark msg NACK");
msg.getLeft().nack();
});
})
.retry(); // this retry has been deleted
}
我的问题已经解决了。
修正上述错误后,我仍然收到重复的消息。当您使用缓冲区或 windows 时,Google Cloud 的 PubSub 不保证“恰好一个交付”是公认的。这正是我的场景,所以我必须实现一种机制来根据消息 ID 删除重复项。