当 JSON schema 发生变化时,如何在 PyFlink SQL 中引用嵌套的 JSON?
How to reference nested JSON within PyFlink SQL when JSON schema varies?
我有一个事件流,我希望使用 PyFlink 处理这些事件,这些事件取自 AWS EventBridge。此流中的事件共享许多公共字段,但它们的 detail
字段根据 source
and/or detail-type
字段的值而变化。例如,这是来自 EC2 的示例事件:
{
"version": "0",
"id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
"detail-type": "EC2 Instance State-change Notification",
"source": "aws.ec2",
"account": "111122223333",
"time": "2017-12-22T18:43:48Z",
"region": "us-west-1",
"detail": {
"instance-id": " i-1234567890abcdef0",
"state": "terminated"
}
}
id
、version
、source
等字段在不同事件类型中是一致的,但请注意,流中不同类型的事件与 detail
字段,例如CodeBuild 事件可能如下所示:
"detail":{
"build-status": "SUCCEEDED",
"project-name": "my-sample-project",
"build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
"additional-information": {
"artifact": {
"md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
"sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
"location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
}
}
}
我想创建一个如下所示的语句,通过 detail-type
对流进行键控以提供不同的 sub-tables,其中每个 sub-table处理方式不同。
INSERT INTO ec2_event_table SELECT * from input_table WHERE source = 'aws.ec2'
INSERT INTO codebuild_event_table SELECT * from input_table WHERE source = 'aws.codebuild'
如何定义 input_table
(包含多路复用事件的输入 table)?我试过:
CREATE TABLE input_table (
source VARCHAR,
detail MAP
)
但这给了我一个错误。我需要指定 MAP
的类型,例如MAP<VARCHAR, VARCHAR>
,我无法使用MAP<>
。
如何使用 PyFlink SQL 引用深层嵌套 JSON?
我正在尝试使用 SQL 和 Table API 来实现,还是我需要使用 DataStream API?我不想为每个不同的事件类型创建不同的输入流。
列 detail
可以声明为 VARCHAR,然后 input_table
可以定义如下:
CREATE TABLE input_table (
version VARCHAR,
id VARCHAR,
detail-type VARCHAR,
source VARCHAR,
account VARCHAR,
time VARCHAR,
region VARCHAR,
detail VARCHAR
) with (
...
)
此外,如果您想处理列 detail
,您可以将其解析为 Python UDF 中的 json,如下所示:
@udf(result_type=DataTypes.STRING())
def get_id(detail):
detail_json = json.loads(detail)
if 'build-id' in detail_json:
return detail_json['build-id']
else:
return detail_json['instance-id']
我有一个事件流,我希望使用 PyFlink 处理这些事件,这些事件取自 AWS EventBridge。此流中的事件共享许多公共字段,但它们的 detail
字段根据 source
and/or detail-type
字段的值而变化。例如,这是来自 EC2 的示例事件:
{
"version": "0",
"id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
"detail-type": "EC2 Instance State-change Notification",
"source": "aws.ec2",
"account": "111122223333",
"time": "2017-12-22T18:43:48Z",
"region": "us-west-1",
"detail": {
"instance-id": " i-1234567890abcdef0",
"state": "terminated"
}
}
id
、version
、source
等字段在不同事件类型中是一致的,但请注意,流中不同类型的事件与 detail
字段,例如CodeBuild 事件可能如下所示:
"detail":{
"build-status": "SUCCEEDED",
"project-name": "my-sample-project",
"build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
"additional-information": {
"artifact": {
"md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
"sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
"location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
}
}
}
我想创建一个如下所示的语句,通过 detail-type
对流进行键控以提供不同的 sub-tables,其中每个 sub-table处理方式不同。
INSERT INTO ec2_event_table SELECT * from input_table WHERE source = 'aws.ec2'
INSERT INTO codebuild_event_table SELECT * from input_table WHERE source = 'aws.codebuild'
如何定义 input_table
(包含多路复用事件的输入 table)?我试过:
CREATE TABLE input_table (
source VARCHAR,
detail MAP
)
但这给了我一个错误。我需要指定 MAP
的类型,例如MAP<VARCHAR, VARCHAR>
,我无法使用MAP<>
。
如何使用 PyFlink SQL 引用深层嵌套 JSON?
我正在尝试使用 SQL 和 Table API 来实现,还是我需要使用 DataStream API?我不想为每个不同的事件类型创建不同的输入流。
列 detail
可以声明为 VARCHAR,然后 input_table
可以定义如下:
CREATE TABLE input_table (
version VARCHAR,
id VARCHAR,
detail-type VARCHAR,
source VARCHAR,
account VARCHAR,
time VARCHAR,
region VARCHAR,
detail VARCHAR
) with (
...
)
此外,如果您想处理列 detail
,您可以将其解析为 Python UDF 中的 json,如下所示:
@udf(result_type=DataTypes.STRING())
def get_id(detail):
detail_json = json.loads(detail)
if 'build-id' in detail_json:
return detail_json['build-id']
else:
return detail_json['instance-id']