带有 apache beam 和运行时参数的动态路径

Dynamic paths with apache beam and Runtime parameters

我正在创建一个管道 TEMPLATE,它接收一些输入文件并计算其中的字数。到目前为止一切正常,但问题是我需要传递另一个参数(从我调用模板的函数),让我传递文件名,以便我可以用它创建路径。

尽管我知道管道在管道构造期间或在运行时上下文之外无法访问运行时参数,但我将向您展示我想要的示例,这可以帮助您了解我需要做什么:

class tempatableTest(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument(
        '--input',
        type=str,
        help='path to the input file'
    )
    parser.add_value_provider_argument(
        '--fdinamic',
        type=str,
        help='folder name'
    )

templatable_test = PipelineOptions().view_as(tempatableTest)
beam_options= PipelineOptions()
input = templatable_test.input
dinamicName = templatable_test.fdinamic.get()

with beam.Pipeline(options=beam_options) as p:
    lines = p | beam.io.ReadFromText(input)
    len = lines | beam.combiners.Count.Globally()
    len | 'countTotalLen' >> beam.io.WriteToText(f'gs://bucket-test-out/processedFile/{dinamicName}/count.txt')

如果我使用 templatable_test.fdinamic.get() 我会得到运行时错误,但如果我删除 .get() 我会在文件夹上得到一个超长的名称。

我知道这可能不是正确的方法,只是为了说明我需要做什么,谢谢你的帮助。

遗憾的是,WriteToText 转换不能用于此,因为它目前仅支持固定目标。因此,为了将文件写入动态目标,您需要使用 fileio module which supports dynamic destinations. Although this does mean switching to using the experimental WriteToFiles 转换中的实用程序。