使用 Appengine 将数据流式传输到 Bigquery

Streaming data to Bigquery using Appengine

我在 BigQuery 中收集数据(从某些网站中安装的 cookie 中获取),使用流式方法和 App Engine 中的 Python 代码。 我用来保存数据的函数如下:

def stream_data(data):
    PROJECT_ID = "project_id"
    DATASET_ID = "dataset_id"

    _SCOPE = 'https://www.googleapis.com/auth/bigquery'

    credentials = appengine.AppAssertionCredentials(scope=_SCOPE)
    http = credentials.authorize(httplib2.Http())
    table = "table_name"
    body = {
            "ignoreUnknownValues": True,                  
            "kind": "bigquery#tableDataInsertAllRequest", 
            "rows": [ 
            { 
                "json": data,
            },
            ]
        }
    bigquery = discovery.build('bigquery', 'v2', http=http)
    bigquery.tabledata().insertAll(projectId=PROJECT_ID, datasetId=DATASET_ID, tableId=table, body=body).execute()

我已经在两个不同的 App Engine 实例上部署了解决方案,但得到了不同的结果。我的问题是:这怎么可能? 另一方面,将结果与 Google 分析指标进行比较,我还注意到并非所有数据都存储在 BigQuery 中。你知道这个问题吗?

使用 insertAll() 方法时,您必须牢记这一点:

Data is streamed temporarily in the streaming buffer which has different availability characteristics than managed storage. Certain operations in BigQuery do not interact with the streaming buffer, such as table copy jobs and API methods like tabledata.list {1}

如果您使用的是 table 预览,流缓冲条目可能不可见。

从您的 table 中执行 SELECT COUNT(*) 应该 return 您的条目总数。

{1}: https://cloud.google.com/bigquery/troubleshooting-errors#missingunavailable-data

在您的代码中,insertAll 操作期间没有查询异常处理。如果 BigQuery 无法写入数据,则不会捕获异常。

在你的最后一行试试这个代码:

bQreturn = bigquery.tabledata().insertAll(projectId=PROJECT_ID, datasetId=DATASET_ID, tableId=table, body=body).execute()
logging.debug(bQreturn)

这样,在Google Cloud Platform log上,可以很容易的找到insertAll操作可能出错的地方。