Python 替代 Java DataFlow 流代码
Python alternative to Java DataFlow Streaming code
我有以下 java 代码片段,它轮询 gcs 存储桶以获取新文件的到达。这是我用于流式传输管道的代码,它会在将一些转换应用到某个目标后进一步加载数据。
PCollection<String> pcollection = pipeline.apply("Read From streaming source",
TextIO.read().from("gs://abc/xyz")
.watchForNewFiles(Duration.standardSeconds(10), Watch.Growth.never()));
但对于特定用例,我需要使用 python 实现相同的目的,为此我无法找到所需的库,也找不到 python 中流媒体管道的所有实现用于 PubSub。 运行 带有 streaming=true
的 python 管道没有解决任何问题,因为代码在完成后退出并且不等待新文件。有人可以建议一种方法吗?
提前致谢。
你需要MatchContinuously
(doc), which was added in this Pull Request.
编辑:
鉴于我看到您在使用 PTransform 时遇到了一些问题(我认为您删除了评论),我将添加一个示例代码:
(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", interval=10.0)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)
我有以下 java 代码片段,它轮询 gcs 存储桶以获取新文件的到达。这是我用于流式传输管道的代码,它会在将一些转换应用到某个目标后进一步加载数据。
PCollection<String> pcollection = pipeline.apply("Read From streaming source",
TextIO.read().from("gs://abc/xyz")
.watchForNewFiles(Duration.standardSeconds(10), Watch.Growth.never()));
但对于特定用例,我需要使用 python 实现相同的目的,为此我无法找到所需的库,也找不到 python 中流媒体管道的所有实现用于 PubSub。 运行 带有 streaming=true
的 python 管道没有解决任何问题,因为代码在完成后退出并且不等待新文件。有人可以建议一种方法吗?
提前致谢。
你需要MatchContinuously
(doc), which was added in this Pull Request.
编辑:
鉴于我看到您在使用 PTransform 时遇到了一些问题(我认为您删除了评论),我将添加一个示例代码:
(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", interval=10.0)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)