如何在 Cloud Dataflow 中使用 TextIO.Read 将多个文件与名称匹配
How to match multiple files with names using TextIO.Read in Cloud Dataflow
我有一个 gcs 文件夹如下:
gs://<bucket-name>/<folder-name>/dt=2017-12-01/part-0000.tsv
/dt=2017-12-02/part-0000.tsv
/dt=2017-12-03/part-0000.tsv
/dt=2017-12-04/part-0000.tsv
...
我只想在 Scio 中使用 sc.textFile()
匹配 dt=2017-12-02
和 dt=2017-12-03
下的文件,据我所知,它在下面使用 TextIO.Read.from()
。
我试过了
gs://<bucket-name>/<folder-name>/dt={2017-12-02,2017-12-03}/*.tsv
和
gs://<bucket-name>/<folder-name>/dt=2017-12-(02|03)/*.tsv
都匹配零个文件:
INFO org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://<bucket-name>/<folder-name>/dt={2017-12-02,2017-12-03}/*.tsv matched 0 files with total size 0
INFO org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://<bucket-name>/<folder-name>/dt=2017-12-(02|03)/*.tsv matched 0 files with total size 0
执行此操作的有效文件模式应该是什么?
您需要使用读取 PCollection<String>
文件模式的 TextIO.readAll()
转换。通过 Create.of()
显式创建文件模式集合,或者您可以使用 ParDo
.
计算它
case class ReadPaths(paths: java.lang.Iterable[String]) extends PTransform[PBegin, PCollection[String]] {
override def expand(input: PBegin) = {
Create.of(paths).expand(input).apply(TextIO.readAll())
}
}
val paths = Seq(
"gs://<bucket-name>/<folder-name>/dt=2017-07-01/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2017-12-20/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2018-03-29/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2018-05-04/part-0000.tsv"
)
import scala.collection.JavaConverters._
sc.customInput("Read Paths", ReadPaths(paths.asJava))
这可能有效:
gs://bucket/folder/dt=2017-12-0[12]/*.tsv
参考:https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames
我有一个 gcs 文件夹如下:
gs://<bucket-name>/<folder-name>/dt=2017-12-01/part-0000.tsv
/dt=2017-12-02/part-0000.tsv
/dt=2017-12-03/part-0000.tsv
/dt=2017-12-04/part-0000.tsv
...
我只想在 Scio 中使用 sc.textFile()
匹配 dt=2017-12-02
和 dt=2017-12-03
下的文件,据我所知,它在下面使用 TextIO.Read.from()
。
我试过了
gs://<bucket-name>/<folder-name>/dt={2017-12-02,2017-12-03}/*.tsv
和
gs://<bucket-name>/<folder-name>/dt=2017-12-(02|03)/*.tsv
都匹配零个文件:
INFO org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://<bucket-name>/<folder-name>/dt={2017-12-02,2017-12-03}/*.tsv matched 0 files with total size 0
INFO org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://<bucket-name>/<folder-name>/dt=2017-12-(02|03)/*.tsv matched 0 files with total size 0
执行此操作的有效文件模式应该是什么?
您需要使用读取 PCollection<String>
文件模式的 TextIO.readAll()
转换。通过 Create.of()
显式创建文件模式集合,或者您可以使用 ParDo
.
case class ReadPaths(paths: java.lang.Iterable[String]) extends PTransform[PBegin, PCollection[String]] {
override def expand(input: PBegin) = {
Create.of(paths).expand(input).apply(TextIO.readAll())
}
}
val paths = Seq(
"gs://<bucket-name>/<folder-name>/dt=2017-07-01/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2017-12-20/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2018-03-29/part-0000.tsv",
"gs://<bucket-name>/<folder-name>/dt=2018-05-04/part-0000.tsv"
)
import scala.collection.JavaConverters._
sc.customInput("Read Paths", ReadPaths(paths.asJava))
这可能有效:
gs://bucket/folder/dt=2017-12-0[12]/*.tsv
参考:https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames