从数据流 1.9 到 2.0/2.1 的意外行为更改
Unexpected behavior change from dataflow 1.9 to 2.0/2.1
对于一个非常简单的管道,我们发现 Dataflow SDK 1.9 和 2.0/2.1 之间存在非常奇怪的差异。
我们有 CoGroupByKey 步骤,通过键连接两个 PCollections 并输出两个 PCollections(通过 TupleTags)。例如,一个 PCollection 可能包含 {"str1", "str2"},另一个可能包含 {"str3"}。
这两个 PCollection 被写入 GCS(在不同的位置),它们的联合(基本上,通过在两个 PCollection 上应用 Flatten 生成的 PCollection)将被管道中的后续步骤使用。使用前面的例子,我们将{"str1"、"str2"}和{"str3"}存储在GCS中各自的位置下,管道将进一步转换它们的并集(Flattened PCollection){"str1"、"str2"、"str3"}等。
在 Dataflow SDK 1.9 中,这正是正在发生的事情,我们已经围绕这个逻辑构建了我们的管道。
当我们慢慢迁移到 2.0/2.1 时,我们注意到不再观察到这种行为。相反,Flatten 步骤之后的所有步骤都是 运行 正确且符合预期的,但是这两个 PCollections(被 Flattened)不再写入 GCS,就好像它们不存在一样。但是在执行图中,显示了步骤,这对我们来说很奇怪。
我们能够可靠地重现此问题,以便我们可以共享数据和代码作为示例。
我们在 GCS 中存储了两个文本文件:
data1.txt:
k1,v1
k2,v2
data2.txt:
k2,w2
k3,w3
我们将读取这两个文件以创建两个 PCollection,每个文件一个 PC。
我们将解析每一行以创建 KV<String, String>
(因此在本例中键为 k1, k2, k3
)。
然后我们应用 CoGroupByKey 并生成 PCollection 以输出到 GCS。
在 CoGroupByKey 步骤之后会生成两个 PCollection,具体取决于与每个键关联的值的数量(这是一个人为的示例,但它是为了演示我们遇到的问题)——无论数字是偶数还是奇数。
因此,其中一台 PC 将包含键 "k1, " 和 "k3"(附加了一些值字符串,请参见下面的代码),因为它们各有一个值,而另一台将包含一个键 "k2" 因为它有两个值(在每个文件中找到)。
这两个PC在不同的位置写入GCS,两者的扁平化PC也会写入GCS(但可以进一步改造)。
三个输出文件预计包含以下内容(行可能不按顺序):
输出1:
k2: [v2],(w2)
输出2:
k3: (w3)
k1: [v1]
输出合并:
k3: (w3)
k2: [v2],(w2)
k1: [v1]
这正是我们在 Dataflow SDK 1.9 中看到的(和预期的)。
然而,在 2.0 和 2.1 中,output1 和 output2 结果为空(并且 TextIO 步骤甚至没有执行,就好像没有元素输入到它们;我们通过在中间添加一个虚拟 ParDo 来验证这一点,而且它根本没有被调用)。
这让我们很好奇为什么突然在 1.9 和 2.0/2.1 之间进行了这种行为更改,以及什么是我们实现 1.9 一直在做的事情的最佳方式。
具体来说,我们生成 output1/2 用于存档目的,同时我们将两台 PC 展平以进一步转换数据并生成另一个输出。
这是 Java 您可以 运行 的代码(您必须正确导入、更改存储桶名称并正确设置选项等)。
1.9 的工作代码:
//Dataflow SDK 1.9 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));
pipeline.run();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.sideOutput(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
2.0/2.1 的工作代码:
// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.output(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
此外,如果它有用,执行图如下所示。
(对于 Google 工程师,还指定了作业 ID)。
使用 1.9(作业 ID 2017-09-29_14_35_42-15149127992051688457):
使用 2.1(作业 ID 2017-09-29_14_31_59-991964669451027883):
TextIO.Write 2,3 在 2.0/2.1 下不产生任何输出。
Flatten,后续步骤正常。
这确实是一个缺陷。修复正在进行中,应该在 Service Release Notes 中记录为可用。
目前的解决方法是使用 1.9.1 SDK,因为此错误仅影响 2.x 个 SDK。
有兴趣尽早获取修复程序的用户也可以使用 Beam 的最新夜间构建(建议解锁开发,而不是用于生产,因为它是每日构建)。 .
对于一个非常简单的管道,我们发现 Dataflow SDK 1.9 和 2.0/2.1 之间存在非常奇怪的差异。
我们有 CoGroupByKey 步骤,通过键连接两个 PCollections 并输出两个 PCollections(通过 TupleTags)。例如,一个 PCollection 可能包含 {"str1", "str2"},另一个可能包含 {"str3"}。
这两个 PCollection 被写入 GCS(在不同的位置),它们的联合(基本上,通过在两个 PCollection 上应用 Flatten 生成的 PCollection)将被管道中的后续步骤使用。使用前面的例子,我们将{"str1"、"str2"}和{"str3"}存储在GCS中各自的位置下,管道将进一步转换它们的并集(Flattened PCollection){"str1"、"str2"、"str3"}等。
在 Dataflow SDK 1.9 中,这正是正在发生的事情,我们已经围绕这个逻辑构建了我们的管道。 当我们慢慢迁移到 2.0/2.1 时,我们注意到不再观察到这种行为。相反,Flatten 步骤之后的所有步骤都是 运行 正确且符合预期的,但是这两个 PCollections(被 Flattened)不再写入 GCS,就好像它们不存在一样。但是在执行图中,显示了步骤,这对我们来说很奇怪。
我们能够可靠地重现此问题,以便我们可以共享数据和代码作为示例。 我们在 GCS 中存储了两个文本文件:
data1.txt:
k1,v1
k2,v2
data2.txt:
k2,w2
k3,w3
我们将读取这两个文件以创建两个 PCollection,每个文件一个 PC。
我们将解析每一行以创建 KV<String, String>
(因此在本例中键为 k1, k2, k3
)。
然后我们应用 CoGroupByKey 并生成 PCollection 以输出到 GCS。 在 CoGroupByKey 步骤之后会生成两个 PCollection,具体取决于与每个键关联的值的数量(这是一个人为的示例,但它是为了演示我们遇到的问题)——无论数字是偶数还是奇数。 因此,其中一台 PC 将包含键 "k1, " 和 "k3"(附加了一些值字符串,请参见下面的代码),因为它们各有一个值,而另一台将包含一个键 "k2" 因为它有两个值(在每个文件中找到)。
这两个PC在不同的位置写入GCS,两者的扁平化PC也会写入GCS(但可以进一步改造)。
三个输出文件预计包含以下内容(行可能不按顺序):
输出1:
k2: [v2],(w2)
输出2:
k3: (w3)
k1: [v1]
输出合并:
k3: (w3)
k2: [v2],(w2)
k1: [v1]
这正是我们在 Dataflow SDK 1.9 中看到的(和预期的)。
然而,在 2.0 和 2.1 中,output1 和 output2 结果为空(并且 TextIO 步骤甚至没有执行,就好像没有元素输入到它们;我们通过在中间添加一个虚拟 ParDo 来验证这一点,而且它根本没有被调用)。
这让我们很好奇为什么突然在 1.9 和 2.0/2.1 之间进行了这种行为更改,以及什么是我们实现 1.9 一直在做的事情的最佳方式。 具体来说,我们生成 output1/2 用于存档目的,同时我们将两台 PC 展平以进一步转换数据并生成另一个输出。
这是 Java 您可以 运行 的代码(您必须正确导入、更改存储桶名称并正确设置选项等)。
1.9 的工作代码:
//Dataflow SDK 1.9 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));
pipeline.run();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.sideOutput(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
2.0/2.1 的工作代码:
// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.output(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
此外,如果它有用,执行图如下所示。 (对于 Google 工程师,还指定了作业 ID)。
使用 1.9(作业 ID 2017-09-29_14_35_42-15149127992051688457):
使用 2.1(作业 ID 2017-09-29_14_31_59-991964669451027883):
TextIO.Write 2,3 在 2.0/2.1 下不产生任何输出。 Flatten,后续步骤正常。
这确实是一个缺陷。修复正在进行中,应该在 Service Release Notes 中记录为可用。
目前的解决方法是使用 1.9.1 SDK,因为此错误仅影响 2.x 个 SDK。
有兴趣尽早获取修复程序的用户也可以使用 Beam 的最新夜间构建(建议解锁开发,而不是用于生产,因为它是每日构建)。