数据流 - 写入 BigQuery - 元组到字典

Dataflow - Write to BigQuery - Tuple to Dictionary

我正在尝试对我的 Beam 管道执行最后一次转换(使用 Google Dataflow 作为运行器但要测试 DirectRunner)并且正在努力了解如何将我的行从元组更改为字典。我尝试使用将元组转换为字典的自定义 class 执行 ParDo 函数。我添加到 TupToDict 函数的调试输出以下内容所以我认为它应该工作?任何帮助表示赞赏......我的头发被扯掉了:)。为了测试我已经更改为 directrunner。我还 'xxxxx' 出了项目信息等

TupToDict 函数的输出。

(u'A3VZVMI3JWCFPD', 5.0)
product_id= A3VZVMI3JWCFPD
avg_rating= 5.0
{'avg_rating': 5.0, 'product_id': u'A3VZVMI3JWCFPD'}

从 directrunnerstring.txt 中输出

avg_rating
product_id
avg_rating
product_id
avg_rating
product_id
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    import sys
    import logging

    PROJECT='xxxx'
    BUCKET='xxxxxxxx'
    schema = 'product_id:STRING,avg_rating:FLOAT'

    class Split(beam.DoFn):
    #
    # This class returns each row as a dictionary
    #
        def process(self, element):
            user_id, product_id, rating = element.split(",")

            return [{
                'user_id': user_id,
                'product_id': product_id,
                'rating': float(rating),
            }]


    class CollectProductID(beam.DoFn):
        def process(self, element):
            #returns a list of tuples containing product_id and rating
            result = [(element['product_id'], element['rating'])]
            return result


    class TupToDict(beam.DoFn):
        def process(self, element):
                print element
            print "product_id=",element[0]
            print "avg_rating=",element[1]
            di = {'product_id': element[0],'avg_rating': element[1]}
            print di
            return di


    def run():
       argv = [
          '--project={0}'.format(PROJECT),
          '--staging_location=gs://{0}/'.format(BUCKET),
          '--temp_location=gs://{0}/'.format(BUCKET),
          #'--runner=DataflowRunner'
          '--runner=DirectRunner'
       ]

       p = beam.Pipeline(argv=argv)

       (p
          | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/xxxxx_10k.csv'.format(BUCKET))
          # Next Transform splits the read PCollection CSV into rows of dictionaries
          | 'ParseCSV' >> beam.ParDo(Split())
          # Next Transform breaks each row into a tuple and the following one groups by product ID's
          | 'CombineProduct' >> beam.ParDo(CollectProductID())
          | 'Grouping Product IDs' >> beam.GroupByKey()
          # Next Transform averages the ratings per product_id
          | 'Calculating Average' >> beam.CombineValues(beam.combiners.MeanCombineFn())
          # Next Transform converts to a dictionary
          | 'Convert To Dict' >> beam.ParDo(TupToDict())
          # Next Transform maps keys to BigQuery Columns
          #| 'Map to BQ COL' >> beam.Map(lamda line: dict(record=line))
          # Next two are to toggle local writing to to BQ
          | 'Write to file' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
          #| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:xxxxxx.ecomm_product_ratings_avg'.format(PROJECT), schema=schema))

       p.run()

    if __name__ == '__main__':
           logging.getLogger().setLevel(logging.INFO)
           run()```

你能试着用你的字典代替 returning

def process(self, element):
    print (element)
    print("product_id=",element[0])
    print("avg_rating=",element[1])
    di = {'product_id': element[0],'avg_rating': element[1]}
    print (di)
    yield di

一个过程函数必须return一个可迭代的,这意味着如果你return一个字典,它会迭代这个字典,这就是为什么它只会在你的输出中存储键。如果你 yield 你的字典,它将 return 一个包含你的字典作为元素的生成器。

扎曼是个英雄。我需要将 [] 添加到我的 TupToDict 函数以 return 实际字典。

我以跑步者的身份将其更改为数据流,并且顺利进入了 BQ!谢谢!