为什么光束管道中的 GroupByKey 会复制元素(当 运行 在 Google 数据流上时)?

Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?

背景

我们有一个从 PubSub 接收消息开始的管道,每个消息都有一个文件名。这些文件被分解为行级别,解析为 JSON 对象节点,然后发送到外部解码服务(解码一些编码数据)。对象节点最终转换为 Table 行并写入 Big Query。

在 PubSub 消息到达解码服务之前,Dataflow 似乎没有确认它们。解码服务速度慢,一次发送多条消息导致积压。这意味着与 PubSub 消息关联的行可能需要一些时间才能到达解码服务。结果,PubSub 没有收到确认并重新发送消息。我的第一个补救尝试是使用 withAttributeId() 为传递给 Reader 的每个 PubSub 消息添加一个属性。但是,在测试中,这只能防止到达的重复项靠得很近。

我的第二次尝试是在 PubSub 读取之后添加一个 fusion breaker (example)。这只是执行一个不必要的 GroupByKey 然后取消分组,其想法是 GroupByKey 强制数据流确认 PubSub 消息。

问题

上面讨论的融合断路器的工作原理是它阻止 PubSub 重新发送消息,但我发现 此 GroupByKey 输出的元素多于它接收的元素:See image。

为了尝试诊断这个问题,我删除了部分管道以获得仍然表现出这种行为的简单管道。即使在

时,行为仍然存在

我观察到的行为是:

  1. 一些接收到的消息直接通过 GroupByKey。
  2. 在某个时间点之后,消息由 GroupByKey 'held'(可能是由于 GroupByKey 之后的积压)。
  3. 这些消息最终退出 GroupByKey(大小为 1 的组)。
  4. 经过短暂的延迟(大约 3 分钟)后,相同的消息再次退出 GroupByKey(仍然是大小为 1 的组)。这可能会发生多次(我怀疑这与他们等待输入 GroupByKey 所花费的时间成正比)。

示例作业 ID 为 2017-10-11_03_50_42-6097948956276262224。我没有 运行 任何其他 运行ner 的光束。

Fusion Breaker 如下:

@Slf4j
public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in")))))
            .apply("Getting iterables after breaking fusion", Values.create())
            .apply("Flattening iterables after breaking fusion", Flatten.iterables())
            .apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out")));
  }

  private PCollection<T> window(PCollection<T> input) {
    return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows())
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .discardingFiredPanes());
  }

  private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) {
    return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>()))
            .apply("Grouping by key to break fusion", GroupByKey.create());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }

}

PassthroughLoggers 只是记录通过的元素(我用它们来确认元素确实重复,而不是计数有问题)。

我怀疑这与 windows/triggers 有关,但我的理解是,无论窗口设置如何,使用 .discardingFiredPanes() 时都不应重复元素。我也尝试过 FixedWindows 但没有成功。

首先,Reshuffle 转换等同于您的 Fusion Breaker,但有一些额外的性能改进,应该更受欢迎。

其次,如果重试,计数器和日志记录都可能多次看到一个元素。如 Beam Execution Model 中所述,如果重试任何融合到其中的元素,则可能会重试该步骤中的元素。

您是否真的观察到管道输出中的重复项?