将带有非标准分隔符的 csv 加载到 BQ
Loading csv with non-standard separator into BQ
假设我在 csv 文件中有以下数据:
'"tom","jones","hello,\nMy name is tom"\x01\n"sarah","smith","hello"\x01\n'
行结束符是\x01\n
。是否可以将其直接加载到 GCS 中(无需先预先格式化)?我的思考过程是:
- 使用非标准分隔符(例如
\x00ff
)将其加载到 CSV 中,以便在一行中获取所有数据。
- 然后对 'clean up' 数据执行基本 DML 并重新格式化。
但是,当我们有 运行 行时,我们 运行 遇到了问题,因为 BQ 没有 'support'(如果你想这样称呼它)行排序。这是我的数据现在在 BQ 中的样子:
正如我们所见,行顺序不起作用,因此不可能 'recombine the data',例如,使用 UDF 来获取我们需要的正确 csv 数据。
这里还有其他可能的方法吗?澄清一下,我希望通过 BigQuery 转换 GCS 上已有的 CSV 文件,而不必在加载到 BQ 之前将该文件下载到单独的服务器进行处理。
作为参考,这是我目前使用的代码:
# /tmp/schema_external_nonstandard_csv.json
{
"schema": {
"fields": [
{
"name": "data",
"type": "STRING"
}
]
},
"sourceFormat": "CSV",
"sourceUris": [
"gs://XY-bq/ns.csv"
],
"csvOptions": {
"fieldDelimiter": "\u00ff",
"quote": ""
},
"maxBadRecords": 1000000
}
$ bq mk --external_table_definition=/tmp/schema_external_nonstandard_csv.json datadocs-163219:bqtesting.ns
$ bq query --nouse_legacy_sql 'CREATE TABLE `XY-163219.bqtesting.ns1` AS select * from `XY-163219.bqtesting.ns`'
我想到了一些纯粹的 BigQuery 解决方案:
- 用 bq 指定分隔符。如文档所述,这不适用于此用例 here
"The separator can be any ISO-8859-1 single-byte character."
- 使用
REGEXP_REPLACE
我得到的最好的是单行,里面有一个换行符:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
REGEXP_REPLACE(data, r"\x01\n", "\n") AS data
FROM
test.separator_external
根据上一点,可以使用 'hack' 将行分解成不同的行(参见答案 here)。但是,需要注意的是,您需要先验地知道拆分的数量,在这种情况下它是不一致的。
您已经在使用但添加行号以便数据可以合并回来的那个。这可能有效,但确保保留行顺序也可能很复杂。
如果我们考虑使用其他 GCP 产品作为 GCS 和 BigQuery 之间的中间人,我们可以找到其他有趣的解决方案:
使用 Dataprep,运行s 数据流在引擎盖下。有一个替换转换 (docs) 并且可以以编程方式生成和调用数据流模板。
使用数据流。我实际上用这个 gist 测试了这个解决方案并且它有效:
我认为可以通过创建模板(自定义分隔符可以是输入参数)并在每次使用 Cloud Functions(NoOps 解决方案)将文件上传到 GCS 时触发它来很好地扩展它。
简而言之,我们使用 TextIO.read().from(file)
从文件中读取记录,其中 file
是 GCS 路径(启动作业时提供 input
和 output
参数)。我们可以使用 withDelimiter()
额外使用一个虚拟分隔符来避免冲突(这里我们再次被限制为单个字节,所以我们不能直接传递真实的)。然后对于每一行,我们用 c.element().split("\\x01\\n")
分隔真正的分隔符。请注意,我们需要转义已经转义的字符(您可以在正常负载的 JSON 查询结果中验证这一点),因此需要四重反斜杠。
p
.apply("GetMessages", TextIO.read().from(file))
.apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String line : c.element().split("\\x01\\n")) {
if (!line.isEmpty()) {
c.output(line);
}
}
}
}))
结果:
请记住,正如@hlagos 所指出的,由于 BigQuery 中的行限制或数据流中分配给单个工作人员的不可拆分步骤,您可能 运行 遇到非常大的单行 CSV 问题.
假设我在 csv 文件中有以下数据:
'"tom","jones","hello,\nMy name is tom"\x01\n"sarah","smith","hello"\x01\n'
行结束符是\x01\n
。是否可以将其直接加载到 GCS 中(无需先预先格式化)?我的思考过程是:
- 使用非标准分隔符(例如
\x00ff
)将其加载到 CSV 中,以便在一行中获取所有数据。 - 然后对 'clean up' 数据执行基本 DML 并重新格式化。
但是,当我们有 运行 行时,我们 运行 遇到了问题,因为 BQ 没有 'support'(如果你想这样称呼它)行排序。这是我的数据现在在 BQ 中的样子:
正如我们所见,行顺序不起作用,因此不可能 'recombine the data',例如,使用 UDF 来获取我们需要的正确 csv 数据。
这里还有其他可能的方法吗?澄清一下,我希望通过 BigQuery 转换 GCS 上已有的 CSV 文件,而不必在加载到 BQ 之前将该文件下载到单独的服务器进行处理。
作为参考,这是我目前使用的代码:
# /tmp/schema_external_nonstandard_csv.json
{
"schema": {
"fields": [
{
"name": "data",
"type": "STRING"
}
]
},
"sourceFormat": "CSV",
"sourceUris": [
"gs://XY-bq/ns.csv"
],
"csvOptions": {
"fieldDelimiter": "\u00ff",
"quote": ""
},
"maxBadRecords": 1000000
}
$ bq mk --external_table_definition=/tmp/schema_external_nonstandard_csv.json datadocs-163219:bqtesting.ns
$ bq query --nouse_legacy_sql 'CREATE TABLE `XY-163219.bqtesting.ns1` AS select * from `XY-163219.bqtesting.ns`'
我想到了一些纯粹的 BigQuery 解决方案:
- 用 bq 指定分隔符。如文档所述,这不适用于此用例 here
"The separator can be any ISO-8859-1 single-byte character."
- 使用
REGEXP_REPLACE
我得到的最好的是单行,里面有一个换行符:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
REGEXP_REPLACE(data, r"\x01\n", "\n") AS data
FROM
test.separator_external
根据上一点,可以使用 'hack' 将行分解成不同的行(参见答案 here)。但是,需要注意的是,您需要先验地知道拆分的数量,在这种情况下它是不一致的。
您已经在使用但添加行号以便数据可以合并回来的那个。这可能有效,但确保保留行顺序也可能很复杂。
如果我们考虑使用其他 GCP 产品作为 GCS 和 BigQuery 之间的中间人,我们可以找到其他有趣的解决方案:
使用 Dataprep,运行s 数据流在引擎盖下。有一个替换转换 (docs) 并且可以以编程方式生成和调用数据流模板。
使用数据流。我实际上用这个 gist 测试了这个解决方案并且它有效: 我认为可以通过创建模板(自定义分隔符可以是输入参数)并在每次使用 Cloud Functions(NoOps 解决方案)将文件上传到 GCS 时触发它来很好地扩展它。
简而言之,我们使用 TextIO.read().from(file)
从文件中读取记录,其中 file
是 GCS 路径(启动作业时提供 input
和 output
参数)。我们可以使用 withDelimiter()
额外使用一个虚拟分隔符来避免冲突(这里我们再次被限制为单个字节,所以我们不能直接传递真实的)。然后对于每一行,我们用 c.element().split("\\x01\\n")
分隔真正的分隔符。请注意,我们需要转义已经转义的字符(您可以在正常负载的 JSON 查询结果中验证这一点),因此需要四重反斜杠。
p
.apply("GetMessages", TextIO.read().from(file))
.apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String line : c.element().split("\\x01\\n")) {
if (!line.isEmpty()) {
c.output(line);
}
}
}
}))
结果:
请记住,正如@hlagos 所指出的,由于 BigQuery 中的行限制或数据流中分配给单个工作人员的不可拆分步骤,您可能 运行 遇到非常大的单行 CSV 问题.