将命令行参数提交给气流上的 pyspark 作业
Submit command line arguments to a pyspark job on airflow
我在 GCP Dataproc 上有一个 pyspark 作业可以在气流上触发,如下所示:
config = help.loadJSON("batch/config_file")
MY_PYSPARK_JOB = {
"reference": {"project_id": "my_project_id"},
"placement": {"cluster_name": "my_cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://file/loc/my_spark_file.py"]
"properties": config["spark_properties"]
"args": <TO_BE_ADDED>
},
}
我需要向此 pyspark 作业提供命令行参数,如下所示[这就是我 运行 来自命令行的 pyspark 作业的方式]:
spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2
我正在使用 "configparser" 为我的 pyspark 作业提供参数。因此,arg1 是键,val1 是我上面的 spark-submit 命令中的值。
如何在上面定义的 "MY_PYSPARK_JOB" 中定义 "args" 参数 [等同于我的命令行参数]?
你必须通过 Sequence[str]
。如果你勾选 DataprocSubmitJobOperator you will see that the params job
implements a class google.cloud.dataproc_v1.types.Job.
class DataprocSubmitJobOperator(BaseOperator):
...
:param job: Required. The job resource. If a dict is provided, it must be of the same form as the protobuf message.
:class:`~google.cloud.dataproc_v1.types.Job`
因此,在关于工作类型 pySpark
的部分,即 google.cloud.dataproc_v1.types.PySparkJob:
args Sequence[str]
Optional. The arguments to pass to the driver. Do not include arguments, such as --conf
, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
我终于解决了这个难题。
如果我们使用 ConfigParser,则必须按如下方式指定密钥 [无论参数是作为命令还是在气流中传递]:
--arg1
在 airflow 中,配置作为 Sequence[str] 传递(如下面的@Betjens 所述),每个参数定义如下:
arg1=val1
因此,根据我的要求,命令行参数定义如下:
"args": ["--arg1=val1",
"--arg2=val2"]
PS:谢谢@Betjens 的所有建议。
我在 GCP Dataproc 上有一个 pyspark 作业可以在气流上触发,如下所示:
config = help.loadJSON("batch/config_file")
MY_PYSPARK_JOB = {
"reference": {"project_id": "my_project_id"},
"placement": {"cluster_name": "my_cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://file/loc/my_spark_file.py"]
"properties": config["spark_properties"]
"args": <TO_BE_ADDED>
},
}
我需要向此 pyspark 作业提供命令行参数,如下所示[这就是我 运行 来自命令行的 pyspark 作业的方式]:
spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2
我正在使用 "configparser" 为我的 pyspark 作业提供参数。因此,arg1 是键,val1 是我上面的 spark-submit 命令中的值。
如何在上面定义的 "MY_PYSPARK_JOB" 中定义 "args" 参数 [等同于我的命令行参数]?
你必须通过 Sequence[str]
。如果你勾选 DataprocSubmitJobOperator you will see that the params job
implements a class google.cloud.dataproc_v1.types.Job.
class DataprocSubmitJobOperator(BaseOperator):
...
:param job: Required. The job resource. If a dict is provided, it must be of the same form as the protobuf message.
:class:`~google.cloud.dataproc_v1.types.Job`
因此,在关于工作类型 pySpark
的部分,即 google.cloud.dataproc_v1.types.PySparkJob:
args Sequence[str] Optional. The arguments to pass to the driver. Do not include arguments, such as
--conf
, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
我终于解决了这个难题。 如果我们使用 ConfigParser,则必须按如下方式指定密钥 [无论参数是作为命令还是在气流中传递]:
--arg1
在 airflow 中,配置作为 Sequence[str] 传递(如下面的@Betjens 所述),每个参数定义如下:
arg1=val1
因此,根据我的要求,命令行参数定义如下:
"args": ["--arg1=val1",
"--arg2=val2"]
PS:谢谢@Betjens 的所有建议。