从数据流 1.9 到 2.0/2.1 的意外行为更改

Unexpected behavior change from dataflow 1.9 to 2.0/2.1

对于一个非常简单的管道,我们发现 Dataflow SDK 1.9 和 2.0/2.1 之间存在非常奇怪的差异。

我们有 CoGroupByKey 步骤,通过键连接两个 PCollections 并输出两个 PCollections(通过 TupleTags)。例如,一个 PCollection 可能包含 {"str1", "str2"},另一个可能包含 {"str3"}。

这两个 PCollection 被写入 GCS(在不同的位置),它们的联合(基本上,通过在两个 PCollection 上应用 Flatten 生成的 PCollection)将被管道中的后续步骤使用。使用前面的例子,我们将{"str1"、"str2"}和{"str3"}存储在GCS中各自的位置下,管道将进一步转换它们的并集(Flattened PCollection){"str1"、"str2"、"str3"}等。

在 Dataflow SDK 1.9 中,这正是正在发生的事情,我们已经围绕这个逻辑构建了我们的管道。 当我们慢慢迁移到 2.0/2.1 时,我们注意到不再观察到这种行为。相反,Flatten 步骤之后的所有步骤都是 运行 正确且符合预期的,但是这两个 PCollections(被 Flattened)不再写入 GCS,就好像它们不存在一样。但是在执行图中,显示了步骤,这对我们来说很奇怪。

我们能够可靠地重现此问题,以便我们可以共享数据和代码作为示例。 我们在 GCS 中存储了两个文本文件:

data1.txt:

k1,v1
k2,v2

data2.txt:

k2,w2
k3,w3

我们将读取这两个文件以创建两个 PCollection,每个文件一个 PC。 我们将解析每一行以创建 KV<String, String>(因此在本例中键为 k1, k2, k3)。

然后我们应用 CoGroupByKey 并生成 PCollection 以输出到 GCS。 在 CoGroupByKey 步骤之后会生成两个 PCollection,具体取决于与每个键关联的值的数量(这是一个人为的示例,但它是为了演示我们遇到的问题)——无论数字是偶数还是奇数。 因此,其中一台 PC 将包含键 "k1, " 和 "k3"(附加了一些值字符串,请参见下面的代码),因为它们各有一个值,而另一台将包含一个键 "k2" 因为它有两个值(在每个文件中找到)。

这两个PC在不同的位置写入GCS,两者的扁平化PC也会写入GCS(但可以进一步改造)。

三个输出文件预计包含以下内容(行可能不按顺序):

输出1:

k2: [v2],(w2)

输出2:

k3: (w3)
k1: [v1]

输出合并:

k3: (w3)
k2: [v2],(w2)
k1: [v1]

这正是我们在 Dataflow SDK 1.9 中看到的(和预期的)。

然而,在 2.0 和 2.1 中,output1 和 output2 结果为空(并且 TextIO 步骤甚至没有执行,就好像没有元素输入到它们;我们通过在中间添加一个虚拟 ParDo 来验证这一点,而且它根本没有被调用)。

这让我们很好奇为什么突然在 1.9 和 2.0/2.1 之间进行了这种行为更改,以及什么是我们实现 1.9 一直在做的事情的最佳方式。 具体来说,我们生成 output1/2 用于存档目的,同时我们将两台 PC 展平以进一步转换数据并生成另一个输出。

这是 Java 您可以 运行 的代码(您必须正确导入、更改存储桶名称并正确设置选项等)。

1.9 的工作代码:

//Dataflow SDK 1.9 compatible.
public class TestJob {
  public static void execute(Options options) {
    Pipeline pipeline = Pipeline.create(options);
    PCollection<KV<String, String>> data1 =
        pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));

    PCollection<KV<String, String>> data2 =
        pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));

    TupleTag<String> inputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    TupleTag<String> outputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
        .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
            .withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
    PCollection<String> output1 = tuple.get(outputTag1);
    PCollection<String> output2 = tuple.get(outputTag2);
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());

    outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
    output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
    output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));

    pipeline.run();
  }

  static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
    private static final long serialVersionUID = 1L;

    final TupleTag<String> inputTag1;
    final TupleTag<String> inputTag2;
    final TupleTag<String> outputTag2;

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
      this.inputTag1 = inputTag1;
      this.inputTag2 = inputTag2;
      this.outputTag2 = outputTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String key = c.element().getKey();
      List<String> values = new ArrayList<String>();
      int numValues = 0;
      for (String val1 : c.element().getValue().getAll(inputTag1)) {
        values.add(String.format("[%s]", val1));
        numValues++;
      }
      for (String val2 : c.element().getValue().getAll(inputTag2)) {
        values.add(String.format("(%s)", val2));
        numValues++;
      }
      final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
      if (numValues % 2 == 0) {
        c.output(line);
      } else {
        c.sideOutput(outputTag2, line);
      }
    }
  }

  static class doFn extends DoFn<String, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String[] tokens = c.element().split(",");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }
}

2.0/2.1 的工作代码:

// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
  public static void execute(Options options) {
    Pipeline pipeline = Pipeline.create(options);
    PCollection<KV<String, String>> data1 =
        pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));

    PCollection<KV<String, String>> data2 =
        pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));

    TupleTag<String> inputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    TupleTag<String> outputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
        .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
            .withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
    PCollection<String> output1 = tuple.get(outputTag1);
    PCollection<String> output2 = tuple.get(outputTag2);
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());

    outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
    output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
    output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));

    PipelineResult pipelineResult = pipeline.run();
    pipelineResult.waitUntilFinish();

  }

  static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
    private static final long serialVersionUID = 1L;

    final TupleTag<String> inputTag1;
    final TupleTag<String> inputTag2;
    final TupleTag<String> outputTag2;

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
      this.inputTag1 = inputTag1;
      this.inputTag2 = inputTag2;
      this.outputTag2 = outputTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String key = c.element().getKey();
      List<String> values = new ArrayList<String>();
      int numValues = 0;
      for (String val1 : c.element().getValue().getAll(inputTag1)) {
        values.add(String.format("[%s]", val1));
        numValues++;
      }
      for (String val2 : c.element().getValue().getAll(inputTag2)) {
        values.add(String.format("(%s)", val2));
        numValues++;
      }
      final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
      if (numValues % 2 == 0) {
        c.output(line);
      } else {
        c.output(outputTag2, line);
      }
    }
  }

  static class doFn extends DoFn<String, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String[] tokens = c.element().split(",");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }
}

此外,如果它有用,执行图如下所示。 (对于 Google 工程师,还指定了作业 ID)。

使用 1.9(作业 ID 2017-09-29_14_35_42-15149127992051688457):

使用 2.1(作业 ID 2017-09-29_14_31_59-991964669451027883):

TextIO.Write 2,3 在 2.0/2.1 下不产生任何输出。 Flatten,后续步骤正常。

这确实是一个缺陷。修复正在进行中,应该在 Service Release Notes 中记录为可用。

目前的解决方法是使用 1.9.1 SDK,因为此错误仅影响 2.x 个 SDK。

有兴趣尽早获取修复程序的用户也可以使用 Beam 的最新夜间构建(建议解锁开发,而不是用于生产,因为它是每日构建)。 .