在 PCollection 上使用 python 在数据流中执行 sql 查询

Executing a sql query inside a dataflow using python on PCollection

我正在尝试将一个 sql 查询作为数据流中的转换来实现。我从 bigquery 加载了一个 table 作为 PCollection。我想像下面的查询那样汇总我的数据。

SELECT name, user_id, place, SUM(amount) as some_amount , SUM(cost) as sum_cost FROM [project:test.day_0_test] GROUP BY 1,2,3 我如何轻松实现它。我听说 Java 的数据流支持 P 集合上的 运行 sql 类查询,但正确的是 python 不支持。谁能帮我解决这个问题

注:

我想在 P 集合上实现这个查询..不是直接从 bigquery 读取

(当您评论不想 运行 直接在 BigQuery 中进行 SQL 查询时,我编辑了我的答案)

我模拟了一个文件 input.csv,其中包含:

#input.csv
name1,1,place1,2.,1.5
name1,1,place1,3.,0.5
name1,1,place2,1.,1
name1,2,place3,2.,1.5
name2,2,place3,3.,0.5

这似乎是您从 BQ 检索的数据。您的 SQL 查询可以在 Beam 中实现,例如:

def sum_l(l):                       
    s0, s1 = 0, 0                                         
    for i in range(len(l)):                                        
        s0 += l[i][0]                                                      
        s1 += l[i][1]                
    return [s0, s1] 

with beam.Pipeline(options=po) as p:
     (p | 'Read Input' >> beam.io.ReadFromText("input.csv")
        | 'Split Commas' >> beam.Map(lambda x: x.strip().split(','))
        | 'Prepare Keys' >> beam.Map(lambda x: (x[:-2], map(float, x[-2:])))
        | 'Group Each Key' >> beam.GroupByKey()
        | 'Make Summation' >> beam.Map(lambda x: [x[0], sum_l([e for e in x[1]])])
        | 'Write Results' >> beam.io.WriteToText('results.csv'))

结果是:

#results.csv-00000-of-00001
[[u'name1', u'1', u'place2'], [1.0, 1.0]]
[[u'name1', u'2', u'place3'], [2.0, 1.5]]
[[u'name1', u'1', u'place1'], [5.0, 2.0]]
[[u'name2', u'2', u'place3'], [3.0, 0.5]]

它基本上是查询的直接 MapReduce 实现:为每一行构建一个键,将它们组合在一起,最后的求和发生在使用函数 sum_lMap 操作中。

我不确定为什么要 运行 Beam 而不是 BigQuery 中的查询操作。我建议尝试这两种方法,因为在这种情况下,在 Beam 中可能无法像在 BigQuery 中那样高效。