gcp PubSub 获得 "processing delay was high"
gcp PubSub getting "processing delay was high"
这是我们的设置,我们有一个主题,这个主题有两个订阅 v1 和 v2,设置完全相同,都是 pull 订阅,确认截止时间为 10 秒。
订阅 v1 和 v2 都进入单独的专用数据流,其中 v2 的数据流更优化但几乎做同样的事情。
问题是,我们时不时地看到下面的警告消息,积压仅在 v2 订阅中开始累积,而 v1 显示几乎没有积压。
08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec.
v2 中的数据流日志除了上面的消息外没有任何明显的显示。事实上,v2 数据流 cpu 使用率低于 v1,所以我无法理解这一点。
问题:
- 什么原因导致处理延迟,我该如何解决?
- 为什么 v1 订阅没有收到相同的警告?
更新于 2017/01/17
正如@ben 所建议的那样,我们在 PubSub 读取后立即执行的 ParDo 过滤操作似乎遇到了意外的高延迟。但是考虑到 getClassroomIds
是一个简单的 java 列表,我不确定如何解决这个问题。一个问题是我们已经申请到 pubsub lazy 的 coder 吗?当调用 ProcessContext#element()
时,是否应用了我们在编码器中定义的解压缩和反序列化?
def processElement(c: DoFn[Entity, Entity]#ProcessContext) = {
val startTime = System.currentTimeMillis()
val entity = c.element()
if (!entity.getClassroomIds.isEmpty) {
c.output(entity)
}
val latencyMs = System.currentTimeMillis() - startTime
if (latencyMs > 1000) {
// We see this warning messages during the load spike
log.warn(s"latency breached 1 second threshold: $latencyMs ms")
}
}
您提到的时间并未准确说明该步骤所花费的时间。特别是,由于 fusion optimization 它反映了过滤操作后的所有 ParDo
。
如果您的管道如下所示:
ReadFromPubSub -> ParDo(过滤) -> ParDo(昂贵) -> ParDo(写入)
Expensive
和 Write
在调用 c.output
returns 之前对来自 Filter
的每个元素执行。这是一个进一步的问题,因为它融合到来自 PubSub 的元素中。
最简单的解决方案可能是执行 Reshuffle:
pipeline
.apply(PubSubIO.read(...))
.apply(keyEachElement by their value and Void)
.apply(new Reshuffle<E, Void>())
.apply(move the key back to the element)
// rest of the pipeline
请注意,使用 Reshuffle
而不是常规 GroupByKey
具有很好的特性,因为它的触发速度比任何正常触发器都快。
这是我们的设置,我们有一个主题,这个主题有两个订阅 v1 和 v2,设置完全相同,都是 pull 订阅,确认截止时间为 10 秒。
订阅 v1 和 v2 都进入单独的专用数据流,其中 v2 的数据流更优化但几乎做同样的事情。
问题是,我们时不时地看到下面的警告消息,积压仅在 v2 订阅中开始累积,而 v1 显示几乎没有积压。
08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec.
v2 中的数据流日志除了上面的消息外没有任何明显的显示。事实上,v2 数据流 cpu 使用率低于 v1,所以我无法理解这一点。
问题:
- 什么原因导致处理延迟,我该如何解决?
- 为什么 v1 订阅没有收到相同的警告?
更新于 2017/01/17
正如@ben 所建议的那样,我们在 PubSub 读取后立即执行的 ParDo 过滤操作似乎遇到了意外的高延迟。但是考虑到 getClassroomIds
是一个简单的 java 列表,我不确定如何解决这个问题。一个问题是我们已经申请到 pubsub lazy 的 coder 吗?当调用 ProcessContext#element()
时,是否应用了我们在编码器中定义的解压缩和反序列化?
def processElement(c: DoFn[Entity, Entity]#ProcessContext) = {
val startTime = System.currentTimeMillis()
val entity = c.element()
if (!entity.getClassroomIds.isEmpty) {
c.output(entity)
}
val latencyMs = System.currentTimeMillis() - startTime
if (latencyMs > 1000) {
// We see this warning messages during the load spike
log.warn(s"latency breached 1 second threshold: $latencyMs ms")
}
}
您提到的时间并未准确说明该步骤所花费的时间。特别是,由于 fusion optimization 它反映了过滤操作后的所有 ParDo
。
如果您的管道如下所示:
ReadFromPubSub -> ParDo(过滤) -> ParDo(昂贵) -> ParDo(写入)
Expensive
和 Write
在调用 c.output
returns 之前对来自 Filter
的每个元素执行。这是一个进一步的问题,因为它融合到来自 PubSub 的元素中。
最简单的解决方案可能是执行 Reshuffle:
pipeline
.apply(PubSubIO.read(...))
.apply(keyEachElement by their value and Void)
.apply(new Reshuffle<E, Void>())
.apply(move the key back to the element)
// rest of the pipeline
请注意,使用 Reshuffle
而不是常规 GroupByKey
具有很好的特性,因为它的触发速度比任何正常触发器都快。