每个输入的动态 PCollection

Dynamic PCollection per input

我有这个管道,它根据作业的动态输入获取一组 URL。 我的问题是访问 source 的动态值,其中 source.get() 除了默认值外没有 return 任何东西,通常我想在 fetch_urls 后面使用一些 PCollection 函数。

正在查看 Python transform catalog overview 我找不到可以回答我的请求的内容。

我如何根据获取某些值的动态值设置 PCollection。

谢谢

def fetch_urls(source):
    sources = source.get()
    # some logic
    return urls
              
class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
            '--source',
            default='my_source',
            type=str,
            help='my_source')   
                
def run():

    parser = argparse.ArgumentParser()
    args, beam_args = parser.parse_known_args()
    
    pipeline_options = PipelineOptions()
    custom_options = pipeline_options.view_as(UserOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        
        urls = fetch_urls(custom_options.source)

        results = (p | 'Set URLs' >> beam.Create(urls)
                        | 'Strip' >> beam.Map(str.strip))

if __name__ == '__main__':
    run()

我找到了一些解决方法

results = (p | 'Set path' >> beam.Create([custom_options.urls])
             | 'Map URls' >> beam.FlatMap(fetch_urls)
             | 'Strip' >> beam.Map(str.strip))

您只获得默认值的原因是您试图在管道构建期间使用运行时值提供程序。运行时值提供程序旨在允许在构建管道之后、开始执行之前为管道指定值。但是,您编写的 fetch_urls 函数是在您构建管道时执行的,而不是在管道执行时执行的。

您在上面发布了一个“解决方法”:

results = (p | 'Set path' >> beam.Create([custom_options.urls])
             | 'Map URls' >> beam.FlatMap(fetch_urls)
             | 'Strip' >> beam.Map(str.strip))

这实际上不是 hacky 或解决方法,而是一个很好的解决方案。发生的事情是您创建了一个包含值提供者本身而不是原始值的 PCollection。当管道执行开始时,值提供程序已被填充,被放入 PCollection,然后传递给 fetch_urls,最后 get() 调用它并输出填充的值。

主要区别在于 get() 是在执行时调用的,因为 fetch_urls 是作为转换执行的,而不是在构造时直接调用。

如需进一步阅读,我建议阅读 Dataflow 网站上的 Creating Templates documentation。它有关于如何在您的管道中使用价值提供者的进一步解释和示例。