WARNING:apache_beam.options.pipeline_options:丢弃不可解析的参数

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args

我目前有代码:

gs_folder = sys.argv[1]
options = PipelineOptions(
    runner='DataflowRunner',
    project='xxx',
    job_name=f'xxx{uuid.uuid4()}',
    region='us-central1',
    temp_location='xxx')

gfs = gcs.GCSFileSystem(options)
p = beam.Pipeline(options=options)

discover_empty = p | 'Filenames' >> beam.Create(gfs.match([gs_folder])[0].metadata_list) | \
            'Reshuffle' >> beam.Reshuffle() | \
            'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))
p.run()

此代码基于问题 。最终发生的是我在下面收到此错误:

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['gs://xx/xx']

这没有多大意义,因为这是我要执行此删除操作的文件夹。此外,看起来数据流作业 运行 成功通过,但应该删除的文件没有正确删除。我应该如何在此处传递管道选项 arg?

我还有几个关于这个过程的后续问题。本地好像是beam.Create() 运行,然后切换到dataflow。我怎样才能在数据流上制作那部分管道 运行?

确保您将标志传递为 --input=gs//... 该错误看起来您的命令行调用无效,并且 gs 路径被解释为整个参数。

beam.Create 作为管道的一部分运行,但传递给它的参数是在本地求值的。要改为在管道中计算它,请使用 beam.Create(None),然后使用运行匹配逻辑的 DoFn。

你可以作为命令行参数传递

--db_port5432--db_passXXXX

import argparse

# initializing Pipeline object - parse CL args
parser = argparse.ArgumentParser()
parser.add_argument('--db_pass')
parser.add_argument('--db_port')
parser.add_argument('--db_host')
args, beam_args = parser.parse_known_args()

# initializing Pipeline object
options = PipelineOptions(beam_args)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)

args.db_pass // your value