运行 BigQuery 中的异步查询速度没有明显加快

Running asynchronous queries in BigQuery not noticeably faster

我在 App Engine 上使用 Google 的 python API 客户端库来 运行 Big Query 中的一些查询以生成实时分析。每次调用大约需要两秒钟,有五个查询,这太长了,所以我研究了加快速度的方法并想到 running queries asynchronously would be a solid improvement. The thinking was that I could insert the five queries at once and Google would do some magic to run them all at the same time and then use jobs.getQueryResults(jobId) to get the results for each job. I decided to test the theory out with a proof of concept by timing the execution of two asynchronous queries and comparing it to running queries synchronously。结果:

仅相差0.68秒。因此,虽然异步查询 更快,但它们并没有实现 Google 并行魔术的目标以减少总执行时间。所以第一个问题:对平行魔法的期望是否正确?即使不是,我特别感兴趣的是 Google 声称

An asynchronous query returns a response immediately, generally before the query completes.

大约半秒插入查询不符合我对'immediately'的定义!我想Jordan或Big Query上的其他人团队将是唯一可以回答这个问题的人,但我欢迎任何答案!

编辑注释:

  1. 根据 Mikhail Berlyant 的建议,我从 jobs response 中收集了 creationTimestartTimeendTime 并发现:

    • creationTimestartTime:462 毫秒、387 毫秒(查询 1 和 2 的时间)
    • startTimeendTime:744 毫秒,1005 毫秒

虽然我不确定这是否会增加故事的内容,因为它是发布 insert() 和我想知道的电话完成之间的时间。

  1. 来自BQ's Jobs documentation,我关于平行魔法的第一个问题的答案是肯定的:

You can run multiple jobs concurrently in BigQuery

代码:

为了它的价值,我在本地和生产 App Engine 上测试了它。 Local 慢了大约 2-3 倍,但复制了结果。在我的研究中,我还发现了 partitioned tables,我希望我以前知道它(这很可能最终成为我的解决方案),但这个问题是独立存在的。这是我的代码。我省略了实际的 SQL 因为它们在这种情况下无关紧要:

    def test_sync(self, request):
    t0 = time.time()

    request = bigquery.jobs()
    data = { 'query': (sql) }
    response = request.query(projectId=project_id, body=data).execute()
    t1 = time.time()

    data = { 'query': (sql) }
    response = request.query(projectId=project_id, body=data).execute()
    t2 = time.time()

    print("0-1: " + str(t1 - t0))
    print("1-2: " + str(t2 - t1))
    print("elapsed: " + str(t2 - t0))

def test_async(self, request):
    job_ids = {}

    t0 = time.time()
    job_id = async_query(sql)
    job_ids['a'] = job_id
    print("job_id: " + job_id)
    t1 = time.time()

    job_id = async_query(sql)
    job_ids['b'] = job_id
    print("job_id: " + job_id)
    t2 = time.time()

    for key, value in job_ids.iteritems():

        response = bigquery.jobs().getQueryResults(
            jobId=value,
            projectId=project_id).execute()

    t3 = time.time()
    print("0-1: " + str(t1 - t0))
    print("1-2: " + str(t2 - t1))
    print("2-3: " + str(t3 - t2))
    print("elapsed: " + str(t3 - t0))

def async_query(sql):
    job_data = {
        'jobReference': {
            'projectId': project_id
        },
        'configuration': {
            'query': {
                'query': sql,
                'priority': 'INTERACTIVE'
            }
        }
    }

response = bigquery.jobs().insert(
    projectId=project_id,
    body=job_data).execute()
job_id = response['jobReference']['jobId']

return job_id

运行并行查询是否会加快结果的答案当然是"it depends"。

当您使用异步作业 API 时,每个查询都会有大约半秒的内置延迟。这是因为 API 不是为短查询而设计的 运行;如果您的查询 运行 在一两秒内完成,则不需要异步处理。

未来半秒延迟可能会下降,但有许多固定成本不会变得更好。例如,您要向 google 而不是一个发送两个 HTTP 请求。这些需要多长时间取决于您从何处发送请求以及您使用的网络的特征。如果您在美国,往返时间可能只有几毫秒,但如果您在巴西,则可能需要 100 毫秒。

此外,当您执行 jobs.query() 时,接收请求的 BigQuery API 服务器与启动查询的服务器相同。它可以在查询完成后立即 return 结果。但是当您使用异步 api 时,您的 getQueryResults() 请求将转到不同的服务器。该服务器必须轮询作业状态或找到正在 运行 请求以获取状态的服务器。这需要时间。

因此,如果您 运行 并行执行一堆查询,每个查询需要 1-2 秒,但每个查询都增加了半秒,再加上它需要半秒在初始请求中的第二个,您不太可能看到很多加速。另一方面,如果您的查询每次花费 5 或 10 秒,则固定开销占总时间的百分比会更小。

我的猜测是,如果您 运行 并行查询的数量更多,您会看到更多的加速。另一种选择是使用 API 的同步版本,但在客户端使用多个线程并行发送多个请求。

还有一个注意事项,那就是查询大小。除非您购买额外的容量,否则默认情况下,BigQuery 会为您的所有查询提供 2000 "slots"。槽是可以并行完成的工作单元。您可以将这 2000 个槽用于 运行 一个巨大的查询,或 20 个较小的查询,每个查询一次使用 100 个槽。如果您 运行 使 2000 个槽饱和的并行查询,您将遇到减速。

也就是说,2000 个插槽很多。粗略估计,2000 个插槽每秒可以处理数百 GB 的数据。因此,除非您通过 BigQuery 推动这种量,否则添加并行查询不太可能减慢您的速度。