数据流 - 写入 BigQuery - 元组到字典
Dataflow - Write to BigQuery - Tuple to Dictionary
我正在尝试对我的 Beam 管道执行最后一次转换(使用 Google Dataflow 作为运行器但要测试 DirectRunner)并且正在努力了解如何将我的行从元组更改为字典。我尝试使用将元组转换为字典的自定义 class 执行 ParDo 函数。我添加到 TupToDict 函数的调试输出以下内容所以我认为它应该工作?任何帮助表示赞赏......我的头发被扯掉了:)。为了测试我已经更改为 directrunner。我还 'xxxxx' 出了项目信息等
TupToDict 函数的输出。
(u'A3VZVMI3JWCFPD', 5.0)
product_id= A3VZVMI3JWCFPD
avg_rating= 5.0
{'avg_rating': 5.0, 'product_id': u'A3VZVMI3JWCFPD'}
从 directrunnerstring.txt 中输出
avg_rating
product_id
avg_rating
product_id
avg_rating
product_id
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import sys
import logging
PROJECT='xxxx'
BUCKET='xxxxxxxx'
schema = 'product_id:STRING,avg_rating:FLOAT'
class Split(beam.DoFn):
#
# This class returns each row as a dictionary
#
def process(self, element):
user_id, product_id, rating = element.split(",")
return [{
'user_id': user_id,
'product_id': product_id,
'rating': float(rating),
}]
class CollectProductID(beam.DoFn):
def process(self, element):
#returns a list of tuples containing product_id and rating
result = [(element['product_id'], element['rating'])]
return result
class TupToDict(beam.DoFn):
def process(self, element):
print element
print "product_id=",element[0]
print "avg_rating=",element[1]
di = {'product_id': element[0],'avg_rating': element[1]}
print di
return di
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/'.format(BUCKET),
'--temp_location=gs://{0}/'.format(BUCKET),
#'--runner=DataflowRunner'
'--runner=DirectRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/xxxxx_10k.csv'.format(BUCKET))
# Next Transform splits the read PCollection CSV into rows of dictionaries
| 'ParseCSV' >> beam.ParDo(Split())
# Next Transform breaks each row into a tuple and the following one groups by product ID's
| 'CombineProduct' >> beam.ParDo(CollectProductID())
| 'Grouping Product IDs' >> beam.GroupByKey()
# Next Transform averages the ratings per product_id
| 'Calculating Average' >> beam.CombineValues(beam.combiners.MeanCombineFn())
# Next Transform converts to a dictionary
| 'Convert To Dict' >> beam.ParDo(TupToDict())
# Next Transform maps keys to BigQuery Columns
#| 'Map to BQ COL' >> beam.Map(lamda line: dict(record=line))
# Next two are to toggle local writing to to BQ
| 'Write to file' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
#| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:xxxxxx.ecomm_product_ratings_avg'.format(PROJECT), schema=schema))
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()```
你能试着用你的字典代替 returning
def process(self, element):
print (element)
print("product_id=",element[0])
print("avg_rating=",element[1])
di = {'product_id': element[0],'avg_rating': element[1]}
print (di)
yield di
一个过程函数必须return一个可迭代的,这意味着如果你return一个字典,它会迭代这个字典,这就是为什么它只会在你的输出中存储键。如果你 yield 你的字典,它将 return 一个包含你的字典作为元素的生成器。
扎曼是个英雄。我需要将 [] 添加到我的 TupToDict 函数以 return 实际字典。
我以跑步者的身份将其更改为数据流,并且顺利进入了 BQ!谢谢!
我正在尝试对我的 Beam 管道执行最后一次转换(使用 Google Dataflow 作为运行器但要测试 DirectRunner)并且正在努力了解如何将我的行从元组更改为字典。我尝试使用将元组转换为字典的自定义 class 执行 ParDo 函数。我添加到 TupToDict 函数的调试输出以下内容所以我认为它应该工作?任何帮助表示赞赏......我的头发被扯掉了:)。为了测试我已经更改为 directrunner。我还 'xxxxx' 出了项目信息等
TupToDict 函数的输出。
(u'A3VZVMI3JWCFPD', 5.0)
product_id= A3VZVMI3JWCFPD
avg_rating= 5.0
{'avg_rating': 5.0, 'product_id': u'A3VZVMI3JWCFPD'}
从 directrunnerstring.txt 中输出
avg_rating
product_id
avg_rating
product_id
avg_rating
product_id
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import sys
import logging
PROJECT='xxxx'
BUCKET='xxxxxxxx'
schema = 'product_id:STRING,avg_rating:FLOAT'
class Split(beam.DoFn):
#
# This class returns each row as a dictionary
#
def process(self, element):
user_id, product_id, rating = element.split(",")
return [{
'user_id': user_id,
'product_id': product_id,
'rating': float(rating),
}]
class CollectProductID(beam.DoFn):
def process(self, element):
#returns a list of tuples containing product_id and rating
result = [(element['product_id'], element['rating'])]
return result
class TupToDict(beam.DoFn):
def process(self, element):
print element
print "product_id=",element[0]
print "avg_rating=",element[1]
di = {'product_id': element[0],'avg_rating': element[1]}
print di
return di
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/'.format(BUCKET),
'--temp_location=gs://{0}/'.format(BUCKET),
#'--runner=DataflowRunner'
'--runner=DirectRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/xxxxx_10k.csv'.format(BUCKET))
# Next Transform splits the read PCollection CSV into rows of dictionaries
| 'ParseCSV' >> beam.ParDo(Split())
# Next Transform breaks each row into a tuple and the following one groups by product ID's
| 'CombineProduct' >> beam.ParDo(CollectProductID())
| 'Grouping Product IDs' >> beam.GroupByKey()
# Next Transform averages the ratings per product_id
| 'Calculating Average' >> beam.CombineValues(beam.combiners.MeanCombineFn())
# Next Transform converts to a dictionary
| 'Convert To Dict' >> beam.ParDo(TupToDict())
# Next Transform maps keys to BigQuery Columns
#| 'Map to BQ COL' >> beam.Map(lamda line: dict(record=line))
# Next two are to toggle local writing to to BQ
| 'Write to file' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
#| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:xxxxxx.ecomm_product_ratings_avg'.format(PROJECT), schema=schema))
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()```
你能试着用你的字典代替 returning
def process(self, element):
print (element)
print("product_id=",element[0])
print("avg_rating=",element[1])
di = {'product_id': element[0],'avg_rating': element[1]}
print (di)
yield di
一个过程函数必须return一个可迭代的,这意味着如果你return一个字典,它会迭代这个字典,这就是为什么它只会在你的输出中存储键。如果你 yield 你的字典,它将 return 一个包含你的字典作为元素的生成器。
扎曼是个英雄。我需要将 [] 添加到我的 TupToDict 函数以 return 实际字典。
我以跑步者的身份将其更改为数据流,并且顺利进入了 BQ!谢谢!