如何使用 Google Cloud Platform 数据流的 Pub/Sub 主题 -> BigQuery 模板将嵌套 JSON 数据放入 BigQuery table

How to put nested JSON data into BigQuery table with Google Cloud Platform's dataflow's Pub/Sub Topic -> BigQuery Template

我正在尝试将从 IoT 设备发送的消息存储在 BigQuery table。

云端架构如下:

本地设备 -> json_message -> mqtt_client -> GC 物联网设备 -> 设备注册表 -> Pub/Sub 主题 -> 带有 Pub/Sub 主题的数据流到 BigQuery模板 -> BigQuery Table

我已经让这个系统使用非嵌套的 JSON 消息,它是这样构造的

json_dict = {"instrument": instrument,
 "spectrum": str(self.spectrum),
 "spectrum_creation_time": self.creation_dt.timestamp(),
 "messenger_creation_time": time.time()}

return json.dumps(json_dict)

成功存储此数据的 BigQuery 中的 table 具有以下架构:

   Last modified                  Schema                 Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- ------------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
  04 Sep 00:24:22   |- instrument: string                1277         81897474
                    |- spectrum: string
                    |- spectrum_creation_time: string
                    |- messenger_creation_time: string

现在我试图让同一个系统使用嵌套的 JSON 消息,其构造如下:

json_dict = {'timestamp': 'AUTO',
             'values': [
                        {'id': instrument + '.Time',
                         'v': time.time(),
                         't': time.time()},
                        {'id': instrument + 'Intensity',
                         'v': str(self.spectrum),
                         't': self.creation_dt.timestamp()}
                        ]}

return json.dumps(json_dict)

我正在尝试使用以下架构将其存储在 BigQuery table 中:

   Last modified               Schema              Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- ------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
  09 Sep 23:56:20   |- timestamp: timestamp        0            0
                    +- values: record (repeated)
                    |  +- time: record
                    |  |  |- id: string
                    |  |  |- v: string
                    |  |  |- t: timestamp
                    |  +- spectrum: record
                    |  |  |- id: string
                    |  |  |- v: string
                    |  |  |- t: timestamp

不幸的是,当我尝试这种方法时出现以下错误,DataFlow 在 BigQuery 中输出错误 table。

{"errors":[{"debugInfo":"","location":"values[0].v","message":"no such field: v.","reason":"invalid"}],"index":0}
null

解决此问题的最佳方法是什么?我无法更改嵌套的 JSON 结构,因为我正在构建一个测试套件,这是必需的格式。

我在@Miach Kornfield 的帮助下解决了我的问题,他对我的原始问题发表了评论。这是我的解决方案。

我发送到 GCP 的 JSON 数据看起来像

json_dict = {"timestamp": "1631554378.2955232",
             'values': [
                        {"id":"testA.Time",
                         "v": "1631554378.2955232",
                         "t": "1631554378.2955232"},
                        {"id": "testA.Time.Intensity",
                         "v": "[1, 43, 4..]",
                         't': "1631554378.2955232"}
                         ]
            }

我的 bigquery table 的原始模式是

Original schema for bigquery

或文本形式

            Schema
------------------------------
 |- timestamp: timestamp
 +- values: record (repeated)
 |  +- time: record
 |  |  |- id: string
 |  |  |- v: string
 |  |  |- t: timestamp
 |  +- spectrum: record
 |  |  |- id: string
 |  |  |- v: string
 |  |  |- t: timestamp

有效的模式是

schema that worked

或文本形式

            Schema
------------------------------
 |- timestamp: timestamp
 +- values: record (repeated)
 |  |- id: string
 |  |- v: string
 |  |- t: timestamp

通过指示值是记录类型(重复),这意味着它是一个结构数组,具有由子列指定的结构的结构。细节(结构的结构由子列指定)对我来说并不明显,为什么我在解决这个问题时遇到了这么多麻烦。我不确定是否有可能(重复)具有异构模式的记录。