Google Cloud Data Fusion 未在 GCS 存储桶中生成 CSV 输出

Google Cloud Data Fusion is not producing CSV output in GCS Bucket

我有一个管道递归地从 Google 云存储 (GCS) 存储桶中读取许多 JSON 文件,然后将每个文件解析为一条记录。然后每条记录通过“Python Transform”插件进行进一步处理(添加新字段和值),最后应将其保存在不同的 GCS 存储桶(接收器)中。

我尝试使用 GCS 接收器的参数,并在它之前添加一个“Wrangler”转换,and/or在“Wrangler”转换之前添加一个“CSV Formatter”转换,都没有帮助生成 CSV 文件。预览输出总是正确的,但是部署时,输出不正确。

在我选择的路径中生成的文件始终是我未选择的文件名,并且始终是文件类型“application/octet-stream”。

The first attempt (full pipeline)

The second type of attempt

The third type of attempt

This is the GCS properties window, and nothing in it is different between the above attempts except for the schema.

这是每次的输出:Deployed pipeline output as octet-stream instead of CSV, and with file name I did not choose

我如何选择文件名,生成的输出在 GCS 存储桶中不是 CSV 格式,我做错了什么?

我也复制了这个,我也无法选择我想要的文件的名称和类型。由于接收器中没有可供选择的内容类型选项,因此文件将作为默认文件输出,即 part-r-00000,文件类型为 application/octet-stream.

If the Content-Type is not specified by the uploader and cannot be determined, it is set to application/octet-stream. here

我已经为此创建了一个功能请求,您也可以跟踪进度。

https://issuetracker.google.com/171366470

我同意@narendra 建议的通过 Spark Scala 代码添加文件名的解决方法。

目前GCS sink插件不支持为写入的文件添加名称,因为写入sink的文件可以拆分成多个部分。我们可以添加一个功能请求以获得 GCS 操作,您可以在接收器之后 运行 将文件合并为一个文件并在那里指定名称。

在撰写本文时,在考虑了提出的意见和想法(@narendra、@Edwin、@Rally)后,我尝试了不同的插件,这就是我确定解决方案的方式:

我用的是Spark Sink插件,还有FileDelete插件,可以放在sink之后。

Spark Sink 的代码很简单:

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
  val fillerVar = "${fillerVar}"
  val fullpath = "gs://somebucket/output/leader_board/"
  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .option("encoding", "UTF-8")
    .mode("append")
    .save(fullpath)

}

输出不仅包括一个 CSV 文件,还包括一个空的“_SUCCESS”文件。这是使用 FileDelete 插件删除的:

我认识到我(现在)找不到一种简单的方法来通过插件更改输出文件名(无论是一个文件还是合并的多个文件)。而且由于我对 Scala/Java 了解不够,所以我也无法弄明白。

出于我的目的,我使用 Google Data Fusion 来生成与 Google Data Studio 一起使用的输出。 Data Studio 不仅可以将单个文件作为数据源,还可以将其指向 GCS 存储桶路径,它会读取其中的所有文件。因此,我不再为无法控制文件名 ("part-00000-[random]).

而烦恼