如何将气流 DataFlowPythonOperator 用于光束管道?
How to use airflow DataFlowPythonOperator for beam pipeline?
在使用 DataFlowPythonOperator 之前,我使用的是 airflow 的 BashOperator。它工作正常。
我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。
仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。
python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
为了使我的光束流水线正常工作,我必须传递这些参数。
现在如何在 DataFlowPythonOperator 中传递这些参数?
我试过了,但我不知道应该在哪里提及所有参数。
我试过这样的事情:
task1 = DataFlowPythonOperator(
task_id = 'my_task',
py_file = '/home/airflow/gcs/pyfile.py',
gcp_conn_id='google_cloud_default',
options={
"num-workers" : 3,
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current',
"jobname" : 'my-job'
},
dataflow_default_options={
"project": 'my-project',
"staging_location": 'gs://path/Staging/',
"temp_location": 'gs://path/Temp/',
},
dag=dag
)
使用当前脚本(虽然我不确定它的格式是否正确)这是我在日志中得到的内容:
[2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,070] {cli.py:545} INFO - Running <TaskInstance: test-df-po.my_task 2020-02-29T00:00:00+00:00 [running]> on host airflow-worker-69b88ff66d-5wwrn
[2020-03-06 05:08:48,245] {taskinstance.py:1059} ERROR - 'int' object has no attribute '__len__'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execut
self.py_file, self.py_options
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflo
label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflo
cmd = command_prefix + self._build_cmd(variables, label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cm
elif value is None or value.__len__() < 1
AttributeError: 'int' object has no attribute '__len__
[2020-03-06 05:08:48,247] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,245] {taskinstance.py:1059} ERROR - 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task Traceback (most recent call last):
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task result = task_copy.execute(context=context)
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execute
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task self.py_file, self.py_options)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflow
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task label_formatter)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrapper
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(self, *args, **kwargs)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflow
[2020-03-06 05:08:48,250] {base_task_runner.py:115} INFO - Job 810: Subtask my_task cmd = command_prefix + self._build_cmd(variables, label_formatter)
[2020-03-06 05:08:48,250] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cmd
[2020-03-06 05:08:48,251] {base_task_runner.py:115} INFO - Job 810: Subtask my_task elif value is None or value.__len__() < 1:
[2020-03-06 05:08:48,251] {taskinstance.py:1082} INFO - Marking task as UP_FOR_RETRY
[2020-03-06 05:08:48,253] {base_task_runner.py:115} INFO - Job 810: Subtask my_task AttributeError: 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,254] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,251] {taskinstance.py:1082} INFO - Marking task as UP_FOR_RETRY
[2020-03-06 05:08:48,331] {base_task_runner.py:115} INFO - Job 810: Subtask my_task Traceback (most recent call last):
[2020-03-06 05:08:48,332] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/bin/airflow", line 7, in <module>
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task exec(compile(f.read(), __file__, 'exec'))
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/airflow", line 37, in <module>
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task args.func(args)
[2020-03-06 05:08:48,335] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper
[2020-03-06 05:08:48,336] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return f(*args, **kwargs)
[2020-03-06 05:08:48,336] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/cli.py", line 551, in run
[2020-03-06 05:08:48,337] {base_task_runner.py:115} INFO - Job 810: Subtask my_task _run(args, dag, ti)
[2020-03-06 05:08:48,338] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/cli.py", line 469, in _run
[2020-03-06 05:08:48,338] {base_task_runner.py:115} INFO - Job 810: Subtask my_task pool=args.pool,
[2020-03-06 05:08:48,339] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2020-03-06 05:08:48,340] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(*args, **kwargs)
[2020-03-06 05:08:48,341] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2020-03-06 05:08:48,342] {base_task_runner.py:115} INFO - Job 810: Subtask my_task result = task_copy.execute(context=context)
[2020-03-06 05:08:48,342] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execute
[2020-03-06 05:08:48,343] {base_task_runner.py:115} INFO - Job 810: Subtask my_task self.py_file, self.py_options)
[2020-03-06 05:08:48,343] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflow
[2020-03-06 05:08:48,344] {base_task_runner.py:115} INFO - Job 810: Subtask my_task label_formatter)
[2020-03-06 05:08:48,345] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrapper
[2020-03-06 05:08:48,345] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(self, *args, **kwargs)
[2020-03-06 05:08:48,346] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflow
[2020-03-06 05:08:48,347] {base_task_runner.py:115} INFO - Job 810: Subtask my_task cmd = command_prefix + self._build_cmd(variables, label_formatter)
[2020-03-06 05:08:48,349] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cmd
[2020-03-06 05:08:48,350] {base_task_runner.py:115} INFO - Job 810: Subtask my_task elif value is None or value.__len__() < 1:
[2020-03-06 05:08:48,350] {base_task_runner.py:115} INFO - Job 810: Subtask my_task AttributeError: 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,638] {helpers.py:308} INFO - Sending Signals.SIGTERM to GPID 8481
[2020-03-06 05:08:48,697] {helpers.py:286} INFO - Process psutil.Process(pid=8481, status='terminated') (8481) terminated with exit code -15
dataflow_operator
文档 here
在 gcp_dataflow_hook.py 中,_build_cmd() 正在检查 options
并构建命令。而在elif value is None or value.__len__() < 1:
中抛出了异常,因为num-workers
、3
的值是整数。所以你只需要将 3 更改为 '3' as string:
options={
"num-workers" : '3',
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current'
},
DataFlowHook._build_cmd():
@staticmethod
def _build_cmd(variables, label_formatter):
command = ["--runner=DataflowRunner"]
if variables is not None:
for attr, value in variables.items():
if attr == 'labels':
command += label_formatter(value)
elif value is None or value.__len__() < 1:
command.append("--" + attr)
else:
command.append("--" + attr + "=" + value)
return command
在使用 DataFlowPythonOperator 之前,我使用的是 airflow 的 BashOperator。它工作正常。 我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。
仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。
python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
为了使我的光束流水线正常工作,我必须传递这些参数。
现在如何在 DataFlowPythonOperator 中传递这些参数?
我试过了,但我不知道应该在哪里提及所有参数。 我试过这样的事情:
task1 = DataFlowPythonOperator(
task_id = 'my_task',
py_file = '/home/airflow/gcs/pyfile.py',
gcp_conn_id='google_cloud_default',
options={
"num-workers" : 3,
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current',
"jobname" : 'my-job'
},
dataflow_default_options={
"project": 'my-project',
"staging_location": 'gs://path/Staging/',
"temp_location": 'gs://path/Temp/',
},
dag=dag
)
使用当前脚本(虽然我不确定它的格式是否正确)这是我在日志中得到的内容:
[2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,070] {cli.py:545} INFO - Running <TaskInstance: test-df-po.my_task 2020-02-29T00:00:00+00:00 [running]> on host airflow-worker-69b88ff66d-5wwrn
[2020-03-06 05:08:48,245] {taskinstance.py:1059} ERROR - 'int' object has no attribute '__len__'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execut
self.py_file, self.py_options
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflo
label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflo
cmd = command_prefix + self._build_cmd(variables, label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cm
elif value is None or value.__len__() < 1
AttributeError: 'int' object has no attribute '__len__
[2020-03-06 05:08:48,247] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,245] {taskinstance.py:1059} ERROR - 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task Traceback (most recent call last):
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task result = task_copy.execute(context=context)
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execute
[2020-03-06 05:08:48,248] {base_task_runner.py:115} INFO - Job 810: Subtask my_task self.py_file, self.py_options)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflow
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task label_formatter)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrapper
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(self, *args, **kwargs)
[2020-03-06 05:08:48,249] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflow
[2020-03-06 05:08:48,250] {base_task_runner.py:115} INFO - Job 810: Subtask my_task cmd = command_prefix + self._build_cmd(variables, label_formatter)
[2020-03-06 05:08:48,250] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cmd
[2020-03-06 05:08:48,251] {base_task_runner.py:115} INFO - Job 810: Subtask my_task elif value is None or value.__len__() < 1:
[2020-03-06 05:08:48,251] {taskinstance.py:1082} INFO - Marking task as UP_FOR_RETRY
[2020-03-06 05:08:48,253] {base_task_runner.py:115} INFO - Job 810: Subtask my_task AttributeError: 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,254] {base_task_runner.py:115} INFO - Job 810: Subtask my_task [2020-03-06 05:08:48,251] {taskinstance.py:1082} INFO - Marking task as UP_FOR_RETRY
[2020-03-06 05:08:48,331] {base_task_runner.py:115} INFO - Job 810: Subtask my_task Traceback (most recent call last):
[2020-03-06 05:08:48,332] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/bin/airflow", line 7, in <module>
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task exec(compile(f.read(), __file__, 'exec'))
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/airflow", line 37, in <module>
[2020-03-06 05:08:48,334] {base_task_runner.py:115} INFO - Job 810: Subtask my_task args.func(args)
[2020-03-06 05:08:48,335] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper
[2020-03-06 05:08:48,336] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return f(*args, **kwargs)
[2020-03-06 05:08:48,336] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/cli.py", line 551, in run
[2020-03-06 05:08:48,337] {base_task_runner.py:115} INFO - Job 810: Subtask my_task _run(args, dag, ti)
[2020-03-06 05:08:48,338] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/bin/cli.py", line 469, in _run
[2020-03-06 05:08:48,338] {base_task_runner.py:115} INFO - Job 810: Subtask my_task pool=args.pool,
[2020-03-06 05:08:48,339] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2020-03-06 05:08:48,340] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(*args, **kwargs)
[2020-03-06 05:08:48,341] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2020-03-06 05:08:48,342] {base_task_runner.py:115} INFO - Job 810: Subtask my_task result = task_copy.execute(context=context)
[2020-03-06 05:08:48,342] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 381, in execute
[2020-03-06 05:08:48,343] {base_task_runner.py:115} INFO - Job 810: Subtask my_task self.py_file, self.py_options)
[2020-03-06 05:08:48,343] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 240, in start_python_dataflow
[2020-03-06 05:08:48,344] {base_task_runner.py:115} INFO - Job 810: Subtask my_task label_formatter)
[2020-03-06 05:08:48,345] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 368, in wrapper
[2020-03-06 05:08:48,345] {base_task_runner.py:115} INFO - Job 810: Subtask my_task return func(self, *args, **kwargs)
[2020-03-06 05:08:48,346] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 197, in _start_dataflow
[2020-03-06 05:08:48,347] {base_task_runner.py:115} INFO - Job 810: Subtask my_task cmd = command_prefix + self._build_cmd(variables, label_formatter)
[2020-03-06 05:08:48,349] {base_task_runner.py:115} INFO - Job 810: Subtask my_task File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 266, in _build_cmd
[2020-03-06 05:08:48,350] {base_task_runner.py:115} INFO - Job 810: Subtask my_task elif value is None or value.__len__() < 1:
[2020-03-06 05:08:48,350] {base_task_runner.py:115} INFO - Job 810: Subtask my_task AttributeError: 'int' object has no attribute '__len__'
[2020-03-06 05:08:48,638] {helpers.py:308} INFO - Sending Signals.SIGTERM to GPID 8481
[2020-03-06 05:08:48,697] {helpers.py:286} INFO - Process psutil.Process(pid=8481, status='terminated') (8481) terminated with exit code -15
dataflow_operator
文档 here
在 gcp_dataflow_hook.py 中,_build_cmd() 正在检查 options
并构建命令。而在elif value is None or value.__len__() < 1:
中抛出了异常,因为num-workers
、3
的值是整数。所以你只需要将 3 更改为 '3' as string:
options={
"num-workers" : '3',
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current'
},
DataFlowHook._build_cmd():
@staticmethod
def _build_cmd(variables, label_formatter):
command = ["--runner=DataflowRunner"]
if variables is not None:
for attr, value in variables.items():
if attr == 'labels':
command += label_formatter(value)
elif value is None or value.__len__() < 1:
command.append("--" + attr)
else:
command.append("--" + attr + "=" + value)
return command