将带有非标准分隔符的 csv 加载到 BQ

Loading csv with non-standard separator into BQ

假设我在 csv 文件中有以下数据:

'"tom","jones","hello,\nMy name is tom"\x01\n"sarah","smith","hello"\x01\n'

行结束符是\x01\n。是否可以将其直接加载到 GCS 中(无需先预先格式化)?我的思考过程是:

但是,当我们有 运行 行时,我们 运行 遇到了问题,因为 BQ 没有 'support'(如果你想这样称呼它)行排序。这是我的数据现在在 BQ 中的样子:

正如我们所见,行顺序不起作用,因此不可能 'recombine the data',例如,使用 UDF 来获取我们需要的正确 csv 数据。

这里还有其他可能的方法吗?澄清一下,我希望通过 BigQuery 转换 GCS 上已有的 CSV 文件,而不必在加载到 BQ 之前将该文件下载到单独的服务器进行处理。


作为参考,这是我目前使用的代码:

# /tmp/schema_external_nonstandard_csv.json
{
  "schema": {
    "fields": [
      {
        "name": "data",
        "type": "STRING"
      }
    ]
  },
  "sourceFormat": "CSV",
  "sourceUris": [
    "gs://XY-bq/ns.csv"
  ],
  "csvOptions": {
    "fieldDelimiter": "\u00ff",
    "quote": ""
  },
  "maxBadRecords": 1000000
}

$ bq mk --external_table_definition=/tmp/schema_external_nonstandard_csv.json datadocs-163219:bqtesting.ns
$ bq query --nouse_legacy_sql 'CREATE TABLE `XY-163219.bqtesting.ns1` AS select * from `XY-163219.bqtesting.ns`'

我想到了一些纯粹的 BigQuery 解决方案:

  1. 用 bq 指定分隔符。如文档所述,这不适用于此用例 here

"The separator can be any ISO-8859-1 single-byte character."

  1. 使用 REGEXP_REPLACE 我得到的最好的是单行,里面有一个换行符:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
  REGEXP_REPLACE(data, r"\x01\n", "\n") AS data
FROM
  test.separator_external

  1. 根据上一点,可以使用 'hack' 将行分解成不同的行(参见答案 here)。但是,需要注意的是,您需要先验地知道拆分的数量,在这种情况下它是不一致的。

  2. 您已经在使用但添加行号以便数据可以合并回来的那个。这可能有效,但确保保留行顺序也可能很复杂。

如果我们考虑使用其他 GCP 产品作为 GCS 和 BigQuery 之间的中间人,我们可以找到其他有趣的解决方案:

  1. 使用 Dataprep,运行s 数据流在引擎盖下。有一个替换转换 (docs) 并且可以以编程方式生成和调用数据流模板。

  2. 使用数据流。我实际上用这个 gist 测试了这个解决方案并且它有效: 我认为可以通过创建模板(自定义分隔符可以是输入参数)并在每次使用 Cloud Functions(NoOps 解决方案)将文件上传到 GCS 时触发它来很好地扩展它。

简而言之,我们使用 TextIO.read().from(file) 从文件中读取记录,其中 file 是 GCS 路径(启动作业时提供 inputoutput 参数)。我们可以使用 withDelimiter() 额外使用一个虚拟分隔符来避免冲突(这里我们再次被限制为单个字节,所以我们不能直接传递真实的)。然后对于每一行,我们用 c.element().split("\\x01\\n") 分隔真正的分隔符。请注意,我们需要转义已经转义的字符(您可以在正常负载的 JSON 查询结果中验证这一点),因此需要四重反斜杠。

p
    .apply("GetMessages", TextIO.read().from(file))
        .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          for (String line : c.element().split("\\x01\\n")) {
            if (!line.isEmpty()) {
              c.output(line);
            }
          }
        }
    }))

结果:

请记住,正如@hlagos 所指出的,由于 BigQuery 中的行限制或数据流中分配给单个工作人员的不可拆分步骤,您可能 运行 遇到非常大的单行 CSV 问题.