Apache Beam ETL 维度 table 加载,有什么例子吗?
Apache Beam ETL dimension table loading , any example?
我正在考虑将文件加载到一个维度中 table。我的解决方案是:
- Beam.read 文件
- 从数据库创建关于现有数据的辅助输入。
- 在 ParDo 中:过滤已在侧输入中的记录
- biquerySink into DB.
想问一下是否有人实现了这个?你能给我举个例子吗?
谢谢
can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me
step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.BigQuerySource(my_query)
| beam.Map(format_rows)
I assume the structure of sourcedata and existing data is the same :<k,v>
step 3: source_existing_Data= {sourcedata,existing_table}
|'coGroupBy' >> beam.coGroupByKey()
step4: new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))
step 5: bigQuerySink(new_Data)
辅助输入是一个不错的选择,但考虑到如果您的数据库 table 非常大,您稍后可能会发现 CoGroupByKey
是更好的选择。要在侧面输入中实现此功能,您需要执行以下操作:
p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
class FilterRowsDoFn(beam.DoFn):
def process(self, elem, table_dict):
k = elem[0]
if k not in table_dict:
yield elem
result = (p
| beam.ReadFromText(...)
| beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))
然后就可以将结果写入BQ了。但是,如果您的 table 已经包含很多元素,您可能需要考虑使用 CoGroupByKey
.
使用 CoGroupByKey 完成此操作的代码应如下所示:
sourcedata = (p
| beam.ReadFromText(...)
| beam.Map(format_text))
existing_table = (p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
source_existing_data = ((sourcedata, existing_table)
| 'coGroupBy' >> beam.coGroupByKey())
new_data = (source_existing_data
| beam.Filter(lamada (name, (source, existing)): not list(source))
| beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))
result = new_data | bigQuerySink(new_Data)
如果您在使用任何一个代码片段时遇到任何问题,请告诉我,我会修复它们。
For the row coming from the text file and row coming form BIGQUERY needed to be done with function :
from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
and also the new data also after coGroupKey and Filter also need to convert since what get from coGroupKey is Tuple, so need to convert it from Dict or List.
Below is the detailed codes:
#####################################################################
# Develop by Emma 2017/08/19
#####################################################################
import argparse
import logging
from random import randrange
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
import sys
sys.path.append("..")
from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
def configure_bigquery_write():
return [
('CAND_ID', 'STRING'),
('CAND_NAME', 'STRING'),
]
class BuildRowFn(beam.DoFn):
def process(self, element):
row = {}
for entry in element:
print('start')
print(entry)
# print(entry[0])
# print(entry[1])
print('end')
row['CAND_ID'] = entry[0]
row['CAND_NAME'] = entry[1]
yield row
def run(argv=None):
"""Run the workflow."""
# schema = 'CAND_ID:STRING,CAND_NAME:STRING'
schema = 'CAND_ID:STRING,CAND_NAME:STRING'
parser = argparse.ArgumentParser()
parser.add_argument('--input', default=r'd:/resource/test*')
parser.add_argument('--output', default=r'd:/output/test/new_emma')
# parser.add_argument('--project', default='chinarose_project')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(GoogleCloudOptions).project = 'chinarose_project'
# query = 'select store FROM [chinarose_project:emma_test.sales]'
query = 'select CAND_ID ,CAND_NAME from emma_test.campaign'
p = beam.Pipeline(options=pipeline_options)
# get the length of the word and write them in the text file,noticed the UDF
source_data = (p | beam.io.ReadFromText(known_args.input)
| beam.Map(lambda a: a.split(","))
| beam.ParDo(BuildTupleRowFn())
)
# source_data | 'write' >> WriteToText(known_args.output)
# source_data | WriteToText(known_args.output)
print("connect to BQ")
existing_data= (p | beam.io.Read(beam.io.BigQuerySource(query=query, project='chinarose_project'))
| beam.ParDo(BuildDictTupleRowFn())
)
#existing_data | WriteToText(known_args.output)
source_existing_data = ((source_data, existing_data)
| 'GoGroupBy' >> beam.CoGroupByKey())
# source_existing_data |'write to text' >> WriteToText(known_args.output)
new_data = (source_existing_data | beam.Filter(lambda (name, (source, existing)): len(existing) == 0)
| beam.Map(lambda (name, (source, existing)): [(name, s) for s in source])
| beam.ParDo(BuildRowFn())
| beam.io.Write(beam.io.BigQuerySink(table='campaign_emma_v2', dataset='emma_test',project='chinarose_project',schema=schema))
)
#new_data | 'write to text' >> WriteToText(known_args.output)
p.run().wait_until_finish()
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
print('begin')
run()
print('end')
我正在考虑将文件加载到一个维度中 table。我的解决方案是:
- Beam.read 文件
- 从数据库创建关于现有数据的辅助输入。
- 在 ParDo 中:过滤已在侧输入中的记录
- biquerySink into DB.
想问一下是否有人实现了这个?你能给我举个例子吗? 谢谢
can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me
step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.BigQuerySource(my_query)
| beam.Map(format_rows)
I assume the structure of sourcedata and existing data is the same :<k,v>
step 3: source_existing_Data= {sourcedata,existing_table}
|'coGroupBy' >> beam.coGroupByKey()
step4: new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))
step 5: bigQuerySink(new_Data)
辅助输入是一个不错的选择,但考虑到如果您的数据库 table 非常大,您稍后可能会发现 CoGroupByKey
是更好的选择。要在侧面输入中实现此功能,您需要执行以下操作:
p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
class FilterRowsDoFn(beam.DoFn):
def process(self, elem, table_dict):
k = elem[0]
if k not in table_dict:
yield elem
result = (p
| beam.ReadFromText(...)
| beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))
然后就可以将结果写入BQ了。但是,如果您的 table 已经包含很多元素,您可能需要考虑使用 CoGroupByKey
.
使用 CoGroupByKey 完成此操作的代码应如下所示:
sourcedata = (p
| beam.ReadFromText(...)
| beam.Map(format_text))
existing_table = (p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
source_existing_data = ((sourcedata, existing_table)
| 'coGroupBy' >> beam.coGroupByKey())
new_data = (source_existing_data
| beam.Filter(lamada (name, (source, existing)): not list(source))
| beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))
result = new_data | bigQuerySink(new_Data)
如果您在使用任何一个代码片段时遇到任何问题,请告诉我,我会修复它们。
For the row coming from the text file and row coming form BIGQUERY needed to be done with function :
from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
and also the new data also after coGroupKey and Filter also need to convert since what get from coGroupKey is Tuple, so need to convert it from Dict or List.
Below is the detailed codes:
#####################################################################
# Develop by Emma 2017/08/19
#####################################################################
import argparse
import logging
from random import randrange
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
import sys
sys.path.append("..")
from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
def configure_bigquery_write():
return [
('CAND_ID', 'STRING'),
('CAND_NAME', 'STRING'),
]
class BuildRowFn(beam.DoFn):
def process(self, element):
row = {}
for entry in element:
print('start')
print(entry)
# print(entry[0])
# print(entry[1])
print('end')
row['CAND_ID'] = entry[0]
row['CAND_NAME'] = entry[1]
yield row
def run(argv=None):
"""Run the workflow."""
# schema = 'CAND_ID:STRING,CAND_NAME:STRING'
schema = 'CAND_ID:STRING,CAND_NAME:STRING'
parser = argparse.ArgumentParser()
parser.add_argument('--input', default=r'd:/resource/test*')
parser.add_argument('--output', default=r'd:/output/test/new_emma')
# parser.add_argument('--project', default='chinarose_project')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(GoogleCloudOptions).project = 'chinarose_project'
# query = 'select store FROM [chinarose_project:emma_test.sales]'
query = 'select CAND_ID ,CAND_NAME from emma_test.campaign'
p = beam.Pipeline(options=pipeline_options)
# get the length of the word and write them in the text file,noticed the UDF
source_data = (p | beam.io.ReadFromText(known_args.input)
| beam.Map(lambda a: a.split(","))
| beam.ParDo(BuildTupleRowFn())
)
# source_data | 'write' >> WriteToText(known_args.output)
# source_data | WriteToText(known_args.output)
print("connect to BQ")
existing_data= (p | beam.io.Read(beam.io.BigQuerySource(query=query, project='chinarose_project'))
| beam.ParDo(BuildDictTupleRowFn())
)
#existing_data | WriteToText(known_args.output)
source_existing_data = ((source_data, existing_data)
| 'GoGroupBy' >> beam.CoGroupByKey())
# source_existing_data |'write to text' >> WriteToText(known_args.output)
new_data = (source_existing_data | beam.Filter(lambda (name, (source, existing)): len(existing) == 0)
| beam.Map(lambda (name, (source, existing)): [(name, s) for s in source])
| beam.ParDo(BuildRowFn())
| beam.io.Write(beam.io.BigQuerySink(table='campaign_emma_v2', dataset='emma_test',project='chinarose_project',schema=schema))
)
#new_data | 'write to text' >> WriteToText(known_args.output)
p.run().wait_until_finish()
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
print('begin')
run()
print('end')