BigQuery Streaming 插入错误 - 在数组外添加重复记录
BigQuery Streaming Insert Error - Repeated record added outside of an array
我在使用 Dataflow Streaming Insert 时遇到了一个奇怪的问题。
我有一个包含大量记录和数组的 JSON。我使用 Streaming Insert 方法和 class DeadLetters 设置了管道来处理错误。
formattedWaiting.apply("Insert Bigquery ",
BigQueryIO.<KV<TableRow,String>>write()
.to(customOptions.getOutputTable())
.withFormatFunction(kv -> kv.getKey())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchemaFromView(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withoutValidation()
.withExtendedErrorInfo()
.withTimePartitioning(new TimePartitioning().setField(customOptions.getPartitionField().get()))
.withClustering(clusteringFieldsList)
.withExtendedErrorInfo())
.getFailedInsertsWithErr()
.apply("Taking 1 element insertion", Sample.<BigQueryInsertError>any(1))
.apply("Insertion errors",ParDo.of(new DeadLettersHandler()));
问题是当我使用流式插入方法时,有些行没有插入到 table 中,我收到错误消息:
名称为 XXXX 的重复记录已添加到数组之外。
我仔细检查了有问题的 JSON,一切正常。
奇怪的是,当我评论 withMethod 行时,该行插入完全没有问题。
我不知道管道为什么会有这种行为。
JSON 看起来像这样。
{
"parameters":{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
},
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"2",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
}
BigQuery 架构很好,因为我可以在评论 BigQueryIO 中的流式插入行时插入数据
有什么想法吗?
提前致谢!
只是对这个问题的更新。
问题出在架构声明和 JSON 本身。
我们将 parameters
列定义为 RECORD REPEATED
,但 parameters
是 JSON 示例中的一个对象。
所以我们有两个选择。
- 将 BigQuery 架构从
RECORD REPEATED
更改为 RECORD NULLABLE
- 向
parameters
对象添加括号 [],对于此选项,您必须转换 JSON 并添加括号以将对象视为数组。
示例:
{
"parameters":[
{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
]
}
我在使用 Dataflow Streaming Insert 时遇到了一个奇怪的问题。 我有一个包含大量记录和数组的 JSON。我使用 Streaming Insert 方法和 class DeadLetters 设置了管道来处理错误。
formattedWaiting.apply("Insert Bigquery ",
BigQueryIO.<KV<TableRow,String>>write()
.to(customOptions.getOutputTable())
.withFormatFunction(kv -> kv.getKey())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchemaFromView(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withoutValidation()
.withExtendedErrorInfo()
.withTimePartitioning(new TimePartitioning().setField(customOptions.getPartitionField().get()))
.withClustering(clusteringFieldsList)
.withExtendedErrorInfo())
.getFailedInsertsWithErr()
.apply("Taking 1 element insertion", Sample.<BigQueryInsertError>any(1))
.apply("Insertion errors",ParDo.of(new DeadLettersHandler()));
问题是当我使用流式插入方法时,有些行没有插入到 table 中,我收到错误消息:
名称为 XXXX 的重复记录已添加到数组之外。
我仔细检查了有问题的 JSON,一切正常。 奇怪的是,当我评论 withMethod 行时,该行插入完全没有问题。
我不知道管道为什么会有这种行为。
JSON 看起来像这样。
{
"parameters":{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
},
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"2",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
}
BigQuery 架构很好,因为我可以在评论 BigQueryIO 中的流式插入行时插入数据
有什么想法吗?
提前致谢!
只是对这个问题的更新。
问题出在架构声明和 JSON 本身。
我们将 parameters
列定义为 RECORD REPEATED
,但 parameters
是 JSON 示例中的一个对象。
所以我们有两个选择。
- 将 BigQuery 架构从
RECORD REPEATED
更改为RECORD NULLABLE
- 向
parameters
对象添加括号 [],对于此选项,您必须转换 JSON 并添加括号以将对象视为数组。
示例:
{
"parameters":[
{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
]
}