读取 Airflow 中嵌套的 JSON 字段?

Read nested JSON fields in Airflow?

我有一个 API returns 一些嵌套的 json 数据,我希望气流读取这些数据。然而,气流给我一个错误,说嵌套字段是 NULL。这些字段不是 NULL,我可以在我向 API.

发出的任何手动获取请求中看到 json 数据

如何修改我的管道以读取这些嵌套字段?

我的 API returns 一个 JSON 对象,例如:

    {
    "email": "ronald@mcdonald.com",
    "first_name": "ronald",
    "last_name": "mcdonald",
    "permissions": {
        "make_burgers": true,
        "make_icecream": false,
        },
     }

我的气流管道对象看起来像:

class StaffPipeline(_DefaultPipeline):
    source_url = f'{config.MCDONALDS_BASE_URL}/staff'
    table_config = TableConfig(
        table_name='mcdonalds__staff',
        field_mapping=[
            ('email', sa.Column('email', sa.Text)),
            ('first_name', sa.Column('first_name', sa.Text)),
            ('last_name', sa.Column('last_name', sa.Text)),
            ('make_burgers', sa.Column('make_burgers', sa.Boolean)),
            ('make_icecream', sa.Column('make_icecream', sa.Boolean)),
        ],
        indexes=(LateIndex("email"), LateIndex("last_name")),
    )

尝试运行此管道时我的错误消息:

raise UnusedColumnError(error)
mcdonalds_data.operators.db_tables.UnusedColumnError: Column mcdonalds__staff_123456789.make_burgers only contains NULL values

谢谢!

答案是你必须像这样在气流中使用嵌套结构:

class StaffPipeline(_DefaultPipeline):
    source_url = f'{config.MCDONALDS_BASE_URL}/staff'
    table_config = TableConfig(
        table_name='mcdonalds__staff',
        field_mapping=[
            ('email', sa.Column('email', sa.Text)),
            ('first_name', sa.Column('first_name', sa.Text)),
            ('last_name', sa.Column('last_name', sa.Text)),
            (
                ("permissions", "make_burgers"),
            sa.Column('make_burgers', sa.Boolean),
            ),
            (
                ("permissions", "make_icecream"),
            sa.Column('make_icecream', sa.Boolean),
            ),
        ],
        indexes=(LateIndex("email"), LateIndex("last_name")),
    )