apache beam streaming pipeline 观看 gcs 文件正则表达式
apache beam streaming pipeline to watch gcs file regex
我有一个流式光束管道,我试图在其中监视多个 globs/regex 模式。这些模式中很少有文件匹配,并且将来会生成很少的模式。
PCollection<String> fileGlobs = p.apply(Create.of(filePatterns));
PCollection<Metadata> f = fileGlobs.apply("MatchALL",
FileIO.matchAll().continuously(
Duration.standardSeconds(10),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
f = .. some more transformations and then write to gcs ..
预期的行为是将现有文件与提供的模式相匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入 GCS。我强制执行的终止条件是,如果最后生成的与特定模式匹配的文件是一个多小时前生成的,则不要尝试匹配模式。
观察到的行为是我们匹配了很多文件,但是在获得无界 f 之后的转换根本没有被执行。日志只显示
polling returned 681384 results, of which 681384 were new. The output is incomplete.
我给出了 2 个不同的正则表达式模式供观察。现有的正则表达式模式之一已经匹配了约 500k 文件,并且每分钟都在添加更多文件,我从未看到输出和上面的日志行。第二个正则表达式模式匹配 0 个文件(启动管道时),但一旦在未来的某个时间点它开始与新来的文件匹配,这些输出文件就会被写入 gcs。
有人可以解释这种行为吗?如果我连续正确地使用匹配。我没有在这里创建任何 windows 因为我的用例非常简单,流文件 -> 读取文件 -> 过滤一些事件 -> 将这些文件写回到一些 gcs 桶。
这是 Splittable DoFn 中的一个错误,它会影响 Watch
转换,以防单轮轮询花费超过 10 秒 - 当观看与大量文件。该错误导致不产生任何输出,因为转换在取得任何进展之前得到了检查点,因此当它从检查点恢复时,它在某种意义上是 "back to square 1"。
请关注 JIRA 以获取更新和建议的解决方法。
我有一个流式光束管道,我试图在其中监视多个 globs/regex 模式。这些模式中很少有文件匹配,并且将来会生成很少的模式。
PCollection<String> fileGlobs = p.apply(Create.of(filePatterns));
PCollection<Metadata> f = fileGlobs.apply("MatchALL",
FileIO.matchAll().continuously(
Duration.standardSeconds(10),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
f = .. some more transformations and then write to gcs ..
预期的行为是将现有文件与提供的模式相匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入 GCS。我强制执行的终止条件是,如果最后生成的与特定模式匹配的文件是一个多小时前生成的,则不要尝试匹配模式。 观察到的行为是我们匹配了很多文件,但是在获得无界 f 之后的转换根本没有被执行。日志只显示
polling returned 681384 results, of which 681384 were new. The output is incomplete.
我给出了 2 个不同的正则表达式模式供观察。现有的正则表达式模式之一已经匹配了约 500k 文件,并且每分钟都在添加更多文件,我从未看到输出和上面的日志行。第二个正则表达式模式匹配 0 个文件(启动管道时),但一旦在未来的某个时间点它开始与新来的文件匹配,这些输出文件就会被写入 gcs。
有人可以解释这种行为吗?如果我连续正确地使用匹配。我没有在这里创建任何 windows 因为我的用例非常简单,流文件 -> 读取文件 -> 过滤一些事件 -> 将这些文件写回到一些 gcs 桶。
这是 Splittable DoFn 中的一个错误,它会影响 Watch
转换,以防单轮轮询花费超过 10 秒 - 当观看与大量文件。该错误导致不产生任何输出,因为转换在取得任何进展之前得到了检查点,因此当它从检查点恢复时,它在某种意义上是 "back to square 1"。
请关注 JIRA 以获取更新和建议的解决方法。