Workflow/Pipeline 特定步骤写入失败

Workflow/Pipeline failing on write for specific step

我们创建了一个管道,它正在从位于 GCS('Clicks'、'Impressions'、'ActiveViews')中的 3 个流执行转换。我们有这样的要求,我们需要将各个流写回 GCS,但要分离文件(稍后加载到 BigQuery 中),因为它们的架构略有不同。

其中一个写入连续两次失败,每次都出现不同的错误,进而导致管道失败。

这些是 GDC 中最后 2 个 workflow/pipeline 的视觉表现,显示失败:

第一个错误:

Feb 21, 2015, 12:55:14 PM (b0cbc05dfc56dbd9): Workflow failed. Causes: (f98c177c56055863): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (2d838e694976dc6): Expansion failed for filepattern: gs://cdf/binaries/tmp-38156614004ed90e-[0-9][0-9][0-9][0-9][0-9]-of-[0-9][0-9][0-9][0-9][0-9].avro.

第二个错误:

Feb 21, 2015, 1:20:15 PM (19dcdcf1fe125eeb): Workflow failed. Causes: (2a27345ef73673d3): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (8f79a20dfa5c4d2b): Unable to view metadata for file: gs://cdf/binaries/tmp-2a27345ef7367fe6-00001-of-00015.avro.

它只发生在 "ActiveViews-GCS-Write" 步骤。

知道我们做错了什么吗?

我们找到了解决方法。问题似乎是当指定了多个输入源并使用展平来合并它们时。

对 2 个输入源(例如我们 2 月 1 日至 2 日的所有文件)使用扁平化不起作用(或者我们做错了):

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150201*"); //1st Feb
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150202*"); //2nd Feb
PCollectionList<String> all = PCollectionList.of(pc1).and(pc2);
PCollection<String> flattened = all.apply(Flatten.<String>pCollections());

相反,我们只使用 GLOB(没有展平)并且每次都有效:

pipeline.apply(TextIO.Read.from("gs://<bucket_name>/Files_2015020[12]*");

原始代码很可能遇到两个不同的问题,其中一个已经修复。这两个问题分别与

有关
  1. 通过将集合拼合在一起来组合集合。
  2. 我们如何处理 glob 模式。

带有扁平化的问题编号 1 已修复。解决该问题后,您很可能会遇到第二个问题,即如何处理 glob 模式。

如果您使用扁平化但具有与您在非扁平化情况下使用的类似的 glob,会发生什么情况,例如

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[1]*");
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[2]*")

在 GCS 中匹配 glob 有点棘手,因为 GCS 列表操作是 eventually consistent