Python 上的 Apache Beam 增加了 beam.Map 次调用
Apache Beam on Python multiplies beam.Map calls
我正在使用 DataFlow 开发管道,它必须执行以下操作:
- 从 BigQuery 中提取最后一个详细的项目(来自 2 个不同的路径)
- 对于每个路径,通过 SFTP 获取新项目并将它们保存到本地文件系统
- 将文件上传到 Google 云存储
我必须在本地获取文件,因为管道不在 DataFlow 集群上 运行(这不是最终代码...)。
从 BigQuery select 我得到两条记录:
我使用这 2 个输出作为 SFTP 函数的输入(仅下载路径 1951 的 1 个文件,'1951_2019112215.log.gz',路径 1952 没有文件)然后我 return 带有路径名称和下载文件的字典:
{'1951': ['1951_2019112215.log.gz']}
{'1952': []}
现在我调用将它们上传到 GC 存储桶的函数,我希望它被调用两次,每个输入一次...但它被调用 8 次(每个输入 4 次),与下载的数量无关文件。
你能解释一下为什么以及我错过了什么吗?
这是管道:
(p
| 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table'])) #output 2 records from BQ
| 'Get Files from Server' >> beam.Map(import_file) #Download files from SFTP and returns the 2 dictionaries above, 1 per call
| 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket) #it is called 4 time per input
)
和
def import_file(element):
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
srv = pysftp.Connection(host=config['SFTP']['host'],
username=config['SFTP']['username'],
private_key=os.path.join('config', config['SFTP']['private_key']),
cnopts=cnopts)
list_downloaded_files = []
last = element['last_file']
file_type = element['folder']
if file_type == '1951':
folder = config['SFTP']['folder_1951']
else:
folder = config['SFTP']['folder_1952']
file_list = srv.listdir(folder)
if len(file_list) > 0:
file_list = file_list[file_list.index(last)+1:]
for file in file_list:
srv.get(remotepath=folder+'/'+str(file), # da cambiare quando gira sul cluster
#remotepath=os.path.join(folder, str(file)),
localpath=os.path.join('download', file))
list_downloaded_files.append(str(file))
return {file_type: list_downloaded_files}
和
def upload_file_on_bucket(list_of_files):
print('chiamata')
print(list_of_files)
if '1951' in list_of_files:
file_type = '1951'
list_of_files = list_of_files['1951']
else:
file_type = '1952'
list_of_files = list_of_files['1952']
client = storage.Client(project='MYPROJECT')
bucket = client.get_bucket('MYBUCKET')
if len(list_of_files) > 0:
for file in list_of_files:
blob = bucket.blob('MYPATH' + file)
blob.upload_from_filename(os.path.join('download', file))
#os.remove(os.path.join('download', file))
list_of_files.sort()
print({file_type: list_of_files[-1]})
return {file_type: list_of_files[-1]}
如果我们 return 一个字典而不是一个列表或来自前一个函数的生成器,函数调用可以被重复。为此,我们应该使用 return [element]
或 yield element
而不是 return element
。使用以下数据展示这一点的示例:
data = [{'team': 'red', 'score': 10},
{'team': 'blue', 'score': 8}]
我们背靠背调用相同的 ParDo
:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Log results 1' >> beam.ParDo(LogResults())
| 'Log results 2' >> beam.ParDo(LogResults()))
并且我们每次只取消注释 return/yield 行中的一行:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Event: %s", element)
# yield element # option 1
# return element # option 2
# return [element] # option 3
我们比较所有三个选项的日志输出:
yield element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
return element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: score
INFO:root:Event: team
return [element]
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
当我们使用 return element
. 时,LogResults
将被调用两次(每个字典键一次)
我正在使用 DataFlow 开发管道,它必须执行以下操作:
- 从 BigQuery 中提取最后一个详细的项目(来自 2 个不同的路径)
- 对于每个路径,通过 SFTP 获取新项目并将它们保存到本地文件系统
- 将文件上传到 Google 云存储
我必须在本地获取文件,因为管道不在 DataFlow 集群上 运行(这不是最终代码...)。
从 BigQuery select 我得到两条记录:
我使用这 2 个输出作为 SFTP 函数的输入(仅下载路径 1951 的 1 个文件,'1951_2019112215.log.gz',路径 1952 没有文件)然后我 return 带有路径名称和下载文件的字典:
{'1951': ['1951_2019112215.log.gz']}
{'1952': []}
现在我调用将它们上传到 GC 存储桶的函数,我希望它被调用两次,每个输入一次...但它被调用 8 次(每个输入 4 次),与下载的数量无关文件。
你能解释一下为什么以及我错过了什么吗? 这是管道:
(p
| 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table'])) #output 2 records from BQ
| 'Get Files from Server' >> beam.Map(import_file) #Download files from SFTP and returns the 2 dictionaries above, 1 per call
| 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket) #it is called 4 time per input
)
和
def import_file(element):
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
srv = pysftp.Connection(host=config['SFTP']['host'],
username=config['SFTP']['username'],
private_key=os.path.join('config', config['SFTP']['private_key']),
cnopts=cnopts)
list_downloaded_files = []
last = element['last_file']
file_type = element['folder']
if file_type == '1951':
folder = config['SFTP']['folder_1951']
else:
folder = config['SFTP']['folder_1952']
file_list = srv.listdir(folder)
if len(file_list) > 0:
file_list = file_list[file_list.index(last)+1:]
for file in file_list:
srv.get(remotepath=folder+'/'+str(file), # da cambiare quando gira sul cluster
#remotepath=os.path.join(folder, str(file)),
localpath=os.path.join('download', file))
list_downloaded_files.append(str(file))
return {file_type: list_downloaded_files}
和
def upload_file_on_bucket(list_of_files):
print('chiamata')
print(list_of_files)
if '1951' in list_of_files:
file_type = '1951'
list_of_files = list_of_files['1951']
else:
file_type = '1952'
list_of_files = list_of_files['1952']
client = storage.Client(project='MYPROJECT')
bucket = client.get_bucket('MYBUCKET')
if len(list_of_files) > 0:
for file in list_of_files:
blob = bucket.blob('MYPATH' + file)
blob.upload_from_filename(os.path.join('download', file))
#os.remove(os.path.join('download', file))
list_of_files.sort()
print({file_type: list_of_files[-1]})
return {file_type: list_of_files[-1]}
如果我们 return 一个字典而不是一个列表或来自前一个函数的生成器,函数调用可以被重复。为此,我们应该使用 return [element]
或 yield element
而不是 return element
。使用以下数据展示这一点的示例:
data = [{'team': 'red', 'score': 10},
{'team': 'blue', 'score': 8}]
我们背靠背调用相同的 ParDo
:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Log results 1' >> beam.ParDo(LogResults())
| 'Log results 2' >> beam.ParDo(LogResults()))
并且我们每次只取消注释 return/yield 行中的一行:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Event: %s", element)
# yield element # option 1
# return element # option 2
# return [element] # option 3
我们比较所有三个选项的日志输出:
yield element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
return element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: score
INFO:root:Event: team
return [element]
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
当我们使用 return element
. 时,LogResults
将被调用两次(每个字典键一次)