Python 上的 Apache Beam 增加了 beam.Map 次调用

Apache Beam on Python multiplies beam.Map calls

我正在使用 DataFlow 开发管道,它必须执行以下操作:

我必须在本地获取文件,因为管道不在 DataFlow 集群上 运行(这不是最终代码...)。

从 BigQuery select 我得到两条记录:

我使用这 2 个输出作为 SFTP 函数的输入(仅下载路径 1951 的 1 个文件,'1951_2019112215.log.gz',路径 1952 没有文件)然后我 return 带有路径名称和下载文件的字典:

{'1951': ['1951_2019112215.log.gz']}
{'1952': []}

现在我调用将它们上传到 GC 存储桶的函数,我希望它被调用两次,每个输入一次...但它被调用 8 次(每个输入 4 次),与下载的数量无关文件。

你能解释一下为什么以及我错过了什么吗? 这是管道:

(p
    | 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table'])) #output 2 records from BQ
    | 'Get Files from Server' >> beam.Map(import_file) #Download files from SFTP and returns the 2 dictionaries above, 1 per call
    | 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket) #it is called 4 time per input
 )

def import_file(element):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    srv = pysftp.Connection(host=config['SFTP']['host'],
                            username=config['SFTP']['username'],
                            private_key=os.path.join('config', config['SFTP']['private_key']),
                            cnopts=cnopts)

    list_downloaded_files = []

    last = element['last_file']
    file_type = element['folder']

    if file_type == '1951':
        folder = config['SFTP']['folder_1951']
    else:
        folder = config['SFTP']['folder_1952']

    file_list = srv.listdir(folder)

    if len(file_list) > 0:
        file_list = file_list[file_list.index(last)+1:]

    for file in file_list:
        srv.get(remotepath=folder+'/'+str(file), # da cambiare quando gira sul cluster
                #remotepath=os.path.join(folder, str(file)),
                localpath=os.path.join('download', file))
        list_downloaded_files.append(str(file))

    return {file_type: list_downloaded_files}

def upload_file_on_bucket(list_of_files):
    print('chiamata')
    print(list_of_files)
    if '1951' in list_of_files:
        file_type = '1951'
        list_of_files = list_of_files['1951']
    else:
        file_type = '1952'
        list_of_files = list_of_files['1952']

    client = storage.Client(project='MYPROJECT')
    bucket = client.get_bucket('MYBUCKET')

    if len(list_of_files) > 0:
        for file in list_of_files:
            blob = bucket.blob('MYPATH' + file)
            blob.upload_from_filename(os.path.join('download', file))
            #os.remove(os.path.join('download', file))

    list_of_files.sort()
    print({file_type: list_of_files[-1]})
    return {file_type: list_of_files[-1]}

如果我们 return 一个字典而不是一个列表或来自前一个函数的生成器,函数调用可以被重复。为此,我们应该使用 return [element]yield element 而不是 return element。使用以下数据展示这一点的示例:

data = [{'team': 'red', 'score': 10},
            {'team': 'blue', 'score': 8}]

我们背靠背调用相同的 ParDo

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Log results 1' >> beam.ParDo(LogResults())
  | 'Log results 2' >> beam.ParDo(LogResults()))

并且我们每次只取消注释 return/yield 行中的一行:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Event: %s", element)

    # yield element  # option 1
    # return element  # option 2
    # return [element]  # option 3

我们比较所有三个选项的日志输出:

yield element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}

return element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: score
INFO:root:Event: team

return [element]
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
当我们使用 return element.

时,

LogResults 将被调用两次(每个字典键一次)