将 pyfiles 和参数传递给 DataProcPySparkOperator
Pass pyfiles and arguments to DataProcPySparkOperator
我正在尝试将参数和压缩的 pyfiles 传递到 Composer
中的临时 Dataproc 集群
spark_args = {
'conn_id': 'spark_default',
'num_executors': 2,
'executor_cores': 2,
'executor_memory': '2G',
'driver_memory': '2G',
}
task = dataproc_operator.DataProcPySparkOperator(
task_id='spark_preprocess_{}'.format(name),
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region='europe-west4',
main='gs://my-bucket/dist/main.py',
pyfiles='gs://my-bucket/dist/jobs.zip',
dataproc_pyspark_properties=spark_args,
arguments=['--name', 'test', '--date', self.date_exec],
dag=subdag
)
但是我收到以下错误,知道如何正确格式化参数吗?
Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)
正如 中指出的那样,问题是 spark_args
具有非字符串值,但它应该只包含每个错误消息的字符串:
Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)
我正在尝试将参数和压缩的 pyfiles 传递到 Composer
中的临时 Dataproc 集群spark_args = {
'conn_id': 'spark_default',
'num_executors': 2,
'executor_cores': 2,
'executor_memory': '2G',
'driver_memory': '2G',
}
task = dataproc_operator.DataProcPySparkOperator(
task_id='spark_preprocess_{}'.format(name),
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region='europe-west4',
main='gs://my-bucket/dist/main.py',
pyfiles='gs://my-bucket/dist/jobs.zip',
dataproc_pyspark_properties=spark_args,
arguments=['--name', 'test', '--date', self.date_exec],
dag=subdag
)
但是我收到以下错误,知道如何正确格式化参数吗?
Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)
正如 spark_args
具有非字符串值,但它应该只包含每个错误消息的字符串:
Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)