Airflow - BigQuery 架构字段中的值无效

Airflow - BigQuery Invalid value at schema fields

我有一个 GCP 云编辑器可以将数据从 GCS 加载到 BQ。我正在使用 schema_fields 选项来传递源模式。我在变量中传递源模式。我从 xcom pull 得到这个。请参阅下面的 print(schema)

[{"mode": "NULLABLE", "name": "id", "type": "INTEGER"}, {"mode": "NULLABLE", "name": "c1", "type": "DATE"}, {"mode": "NULLABLE", "name": "c2", "type": "TIME"}, {"mode": "NULLABLE", "name": "c3", "type": "DATETIME"}, {"mode": "NULLABLE", "name": "c4", "type": "TIMESTAMP"}]

在 BQ 运算符中我使用 schema_fields=schema

但是当我运行 dag 时,它抛出错误。

ERROR - <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/xxx-xxx/jobs?
alt=json returned "Invalid value at 'job.configuration.load.schema.fields' (type.googleapis.com/google.cloud.bigquery.v2.TableFieldSchema), 
"[{"mode": "NULLABLE", "name": "id", "type": "INTEGER"}, {"mode": "NULLABLE", "name": "c1", "type": "DATE"}, {"mode": "NULLABLE", "name": "c2", "type": "TIME"}, {"mode": "NULLABLE", "name": "c3", "type": "DATETIME"}, {"mode": "NULLABLE", "name": "c4", "type": "TIMESTAMP"}]"">

但是,当我将此模式保存为 GCS 中的文件并尝试使用 schema_object 时,它就成功了。但是通过变量做同样的事情没有用。

我可以使用 json.loads

来解决这个问题
schema=json.loads(xcom_pull commands)

Schema_fields 参数接受有效列表。目前还没有模板化。

我已经使用了这个示例字段并且它正在工作。请检查类似的格式,看看它是否适合您。

schema_fields=[
                 {'name': 'Col1', 'type': 'STRING', 'mode': 'NULLABLE'},
                 {'name': 'Col2', 'type': 'STRING', 'mode': 'NULLABLE'},
             ]