BigQuery Streaming 插入错误 - 在数组外添加重复记录

BigQuery Streaming Insert Error - Repeated record added outside of an array

我在使用 Dataflow Streaming Insert 时遇到了一个奇怪的问题。 我有一个包含大量记录和数组的 JSON。我使用 Streaming Insert 方法和 class DeadLetters 设置了管道来处理错误。

formattedWaiting.apply("Insert Bigquery ",
                BigQueryIO.<KV<TableRow,String>>write()
                .to(customOptions.getOutputTable())
                .withFormatFunction(kv -> kv.getKey())
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withSchemaFromView(schema)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withoutValidation()
                .withExtendedErrorInfo()
                .withTimePartitioning(new TimePartitioning().setField(customOptions.getPartitionField().get()))
                .withClustering(clusteringFieldsList)
                .withExtendedErrorInfo())
                .getFailedInsertsWithErr()
                .apply("Taking 1 element insertion", Sample.<BigQueryInsertError>any(1))
                .apply("Insertion errors",ParDo.of(new DeadLettersHandler()));

问题是当我使用流式插入方法时,有些行没有插入到 table 中,我收到错误消息:

名称为 XXXX 的重复记录已添加到数组之外。

我仔细检查了有问题的 JSON,一切正常。 奇怪的是,当我评论 withMethod 行时,该行插入完全没有问题。

我不知道管道为什么会有这种行为。

JSON 看起来像这样。

{
   "parameters":{
      "parameter":[
         {
            "subParameter":[
               {
                  "value":"T",
                  "key":"C"
               },
               {
                  "value":"1",
                  "key":"SEQUENCE_NUMBER"
               },
               {
                  "value":"1",
                  "key":"SEQUENCE_NUMBER"
               }
            ],
            "value":"C",
            "key":"C"
         },
         {
            "subParameter":[
               {
                  "value":"T",
                  "key":"C"
               },
               {
                  "value":"1",
                  "key":"SEQUENCE_NUMBER"
               },
               {
                  "value":"2",
                  "key":"SEQUENCE_NUMBER"
               }
            ],
            "value":"C",
            "key":"C"
         }
      ]
   }
}

BigQuery 架构很好,因为我可以在评论 BigQueryIO 中的流式插入行时插入数据

有什么想法吗?

提前致谢!

只是对这个问题的更新。

问题出在架构声明和 JSON 本身。

我们将 parameters 列定义为 RECORD REPEATED,但 parameters 是 JSON 示例中的一个对象。

所以我们有两个选择。

  1. 将 BigQuery 架构从 RECORD REPEATED 更改为 RECORD NULLABLE
  2. parameters 对象添加括号 [],对于此选项,您必须转换 JSON 并添加括号以将对象视为数组。

示例:

{
   "parameters":[
      {
         "parameter":[
            {
               "subParameter":[
                  {
                     "value":"T",
                     "key":"C"
                  },
                  {
                     "value":"1",
                     "key":"SEQUENCE_NUMBER"
                  },
                  {
                     "value":"1",
                     "key":"SEQUENCE_NUMBER"
                  }
               ],
               "value":"C",
               "key":"C"
            }
         ]
      }
   ]
}