将 pyfiles 和参数传递给 DataProcPySparkOperator

Pass pyfiles and arguments to DataProcPySparkOperator

我正在尝试将参数和压缩的 pyfiles 传递到 Composer

中的临时 Dataproc 集群
spark_args = {
    'conn_id': 'spark_default',
    'num_executors': 2,
    'executor_cores': 2,
    'executor_memory': '2G',
    'driver_memory': '2G',
}    

task = dataproc_operator.DataProcPySparkOperator(
                task_id='spark_preprocess_{}'.format(name),
                project_id=PROJECT_ID,
                cluster_name=CLUSTER_NAME,
                region='europe-west4',
                main='gs://my-bucket/dist/main.py',
                pyfiles='gs://my-bucket/dist/jobs.zip',
                dataproc_pyspark_properties=spark_args,
                arguments=['--name', 'test', '--date', self.date_exec],
                dag=subdag
            )

但是我收到以下错误,知道如何正确格式化参数吗?

Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)

正如 中指出的那样,问题是 spark_args 具有非字符串值,但它应该只包含每个错误消息的字符串:

Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)