gcp PubSub 获得 "processing delay was high"

gcp PubSub getting "processing delay was high"

这是我们的设置,我们有一个主题,这个主题有两个订阅 v1 和 v2,设置完全相同,都是 pull 订阅,确认截止时间为 10 秒。
订阅 v1 和 v2 都进入单独的专用数据流,其中 v2 的数据流更优化但几乎做同样的事情。

问题是,我们时不时地看到下面的警告消息,积压仅在 v2 订阅中开始累积,而 v1 显示几乎没有积压。

08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec.

v2 中的数据流日志除了上面的消息外没有任何明显的显示。事实上,v2 数据流 cpu 使用率低于 v1,所以我无法理解这一点。

问题:

  1. 什么原因导致处理延迟,我该如何解决?
  2. 为什么 v1 订阅没有收到相同的警告?

更新于 2017/01/17

正如@ben 所建议的那样,我们在 PubSub 读取后立即执行的 ParDo 过滤操作似乎遇到了意外的高延迟。但是考虑到 getClassroomIds 是一个简单的 java 列表,我不确定如何解决这个问题。一个问题是我们已经申请到 pubsub lazy 的 coder 吗?当调用 ProcessContext#element() 时,是否应用了我们在编码器中定义的解压缩和反序列化?

def processElement(c: DoFn[Entity, Entity]#ProcessContext) = {
  val startTime = System.currentTimeMillis()
  val entity = c.element()
  if (!entity.getClassroomIds.isEmpty) {
    c.output(entity)
  }

  val latencyMs = System.currentTimeMillis() - startTime
  if (latencyMs > 1000) {
    // We see this warning messages during the load spike
    log.warn(s"latency breached 1 second threshold: $latencyMs ms")
  }
}

您提到的时间并未准确说明该步骤所花费的时间。特别是,由于 fusion optimization 它反映了过滤操作后的所有 ParDo

如果您的管道如下所示:

ReadFromPubSub -> ParDo(过滤) -> ParDo(昂贵) -> ParDo(写入)

ExpensiveWrite 在调用 c.output returns 之前对来自 Filter 的每个元素执行。这是一个进一步的问题,因为它融合到来自 PubSub 的元素中。

最简单的解决方案可能是执行 Reshuffle:

pipeline
  .apply(PubSubIO.read(...))
  .apply(keyEachElement by their value and Void)
  .apply(new Reshuffle<E, Void>())
  .apply(move the key back to the element)
  // rest of the pipeline

请注意,使用 Reshuffle 而不是常规 GroupByKey 具有很好的特性,因为它的触发速度比任何正常触发器都快。