GCP 中的物联网管道

IoT pipeline in GCP

我在 GCP 中有一个 IoT 管道,其结构如下:

IoT Core -> Pub/Sub -> Dataflow -> BigQuery

我正在使用 esp32 设备发送数据,每 2 秒发送一次新数据。现在我只用 4 台设备进行测试,但最终该项目将由数百个 esp32 设备组成,每个设备每 2 秒发送一次数据。问题是,即使有 4 台设备,订阅中未确认的消息数也会增加到 1260 条消息。即使这些消息没有丢失,它们只是被延迟了,当我不得不使用数百台设备时,这最终可能会导致问题。所以我需要改变我的管道,以便可以成功存储数据而不会出现这种延迟。发送的数据为 csv 格式。它使用 Javascript UDF 在 Dataflow 中转换为 JSON,然后使用 google 定义的模板上传到 Bigquery: Pub/Sub to BigQuery。所有设备都使用相同的 Pub/Sub 主题和订阅。来自所有设备的数据都上传到同一个 BigQuery table。如果有帮助,那么也可以先将数据存储在其他地方,例如云存储(如果速度更快),然后稍后(每隔一小时左右)将所有数据上传到 BigQuery,但最终我需要我所有的数据在 BigQuery 中。请建议我如何改进我的管道。

造成此错误的原因是每 10 秒后 pub/sub 重新发送尚未确认的消息。这导致消息总数迅速增长,因为发送消息的设备数量及其发送速度已经非常高。所以我将这个等待时间增加到 30 秒,系统就平静下来了。现在,当我 运行 管道时,不会形成大量未确认的消息。