读取 Airflow 中嵌套的 JSON 字段?
Read nested JSON fields in Airflow?
我有一个 API returns 一些嵌套的 json 数据,我希望气流读取这些数据。然而,气流给我一个错误,说嵌套字段是 NULL。这些字段不是 NULL,我可以在我向 API.
发出的任何手动获取请求中看到 json 数据
如何修改我的管道以读取这些嵌套字段?
我的 API returns 一个 JSON 对象,例如:
{
"email": "ronald@mcdonald.com",
"first_name": "ronald",
"last_name": "mcdonald",
"permissions": {
"make_burgers": true,
"make_icecream": false,
},
}
我的气流管道对象看起来像:
class StaffPipeline(_DefaultPipeline):
source_url = f'{config.MCDONALDS_BASE_URL}/staff'
table_config = TableConfig(
table_name='mcdonalds__staff',
field_mapping=[
('email', sa.Column('email', sa.Text)),
('first_name', sa.Column('first_name', sa.Text)),
('last_name', sa.Column('last_name', sa.Text)),
('make_burgers', sa.Column('make_burgers', sa.Boolean)),
('make_icecream', sa.Column('make_icecream', sa.Boolean)),
],
indexes=(LateIndex("email"), LateIndex("last_name")),
)
尝试运行此管道时我的错误消息:
raise UnusedColumnError(error)
mcdonalds_data.operators.db_tables.UnusedColumnError: Column mcdonalds__staff_123456789.make_burgers only contains NULL values
谢谢!
答案是你必须像这样在气流中使用嵌套结构:
class StaffPipeline(_DefaultPipeline):
source_url = f'{config.MCDONALDS_BASE_URL}/staff'
table_config = TableConfig(
table_name='mcdonalds__staff',
field_mapping=[
('email', sa.Column('email', sa.Text)),
('first_name', sa.Column('first_name', sa.Text)),
('last_name', sa.Column('last_name', sa.Text)),
(
("permissions", "make_burgers"),
sa.Column('make_burgers', sa.Boolean),
),
(
("permissions", "make_icecream"),
sa.Column('make_icecream', sa.Boolean),
),
],
indexes=(LateIndex("email"), LateIndex("last_name")),
)
我有一个 API returns 一些嵌套的 json 数据,我希望气流读取这些数据。然而,气流给我一个错误,说嵌套字段是 NULL。这些字段不是 NULL,我可以在我向 API.
发出的任何手动获取请求中看到 json 数据如何修改我的管道以读取这些嵌套字段?
我的 API returns 一个 JSON 对象,例如:
{
"email": "ronald@mcdonald.com",
"first_name": "ronald",
"last_name": "mcdonald",
"permissions": {
"make_burgers": true,
"make_icecream": false,
},
}
我的气流管道对象看起来像:
class StaffPipeline(_DefaultPipeline):
source_url = f'{config.MCDONALDS_BASE_URL}/staff'
table_config = TableConfig(
table_name='mcdonalds__staff',
field_mapping=[
('email', sa.Column('email', sa.Text)),
('first_name', sa.Column('first_name', sa.Text)),
('last_name', sa.Column('last_name', sa.Text)),
('make_burgers', sa.Column('make_burgers', sa.Boolean)),
('make_icecream', sa.Column('make_icecream', sa.Boolean)),
],
indexes=(LateIndex("email"), LateIndex("last_name")),
)
尝试运行此管道时我的错误消息:
raise UnusedColumnError(error)
mcdonalds_data.operators.db_tables.UnusedColumnError: Column mcdonalds__staff_123456789.make_burgers only contains NULL values
谢谢!
答案是你必须像这样在气流中使用嵌套结构:
class StaffPipeline(_DefaultPipeline):
source_url = f'{config.MCDONALDS_BASE_URL}/staff'
table_config = TableConfig(
table_name='mcdonalds__staff',
field_mapping=[
('email', sa.Column('email', sa.Text)),
('first_name', sa.Column('first_name', sa.Text)),
('last_name', sa.Column('last_name', sa.Text)),
(
("permissions", "make_burgers"),
sa.Column('make_burgers', sa.Boolean),
),
(
("permissions", "make_icecream"),
sa.Column('make_icecream', sa.Boolean),
),
],
indexes=(LateIndex("email"), LateIndex("last_name")),
)