KSQL 事件合并——基于时间戳合并来自单个流的事件
KSQL Event Merging - Combining events from a single stream based on timestamp
我正在尝试使用 ksql 将单个输入流中的多个事件合并为按时间戳分组的单个输出事件。我还希望输出事件包含输入事件的平均值,虽然这不是严格意义上的 nessersay,但更好。
输入流:温度
event1: {location: "hallway", value: 23, property_Id: "123", timestamp: "1551645625878"}
event2: {location: "bedroom", value: 21, property_Id: "123", timestamp: "1551645625878"}
event3: {location: "kitchen", value: 20, property_Id: "123", timestamp: "1551645625878"}
event4: {location: "hallway", value: 19, property_Id: "123", timestamp: "9991645925878"}
event5: {location: "bedroom", value: 18, property_Id: "123", timestamp: "9991645925878"}
event6: {location: "kitchen", value: 18, property_Id: "123", timestamp: "9991645925878"}
(期望)输出流:
event1:
{
"property_id": "123",
"timestamp": "1551645625878",
"average_temperature": 21,
"temperature": [
{
"location": "hallway",
"value": 23
},
{
"location": "bedroom",
"value": 21
},
{
"location": "kitchen",
"value": 20
}
]
}
event2:
{
"property_id": "123",
"timestamp": "9991645925878",
"average_temperature": 18,
"temperature": [
{
"location": "hallway",
"value": 19
},
{
"location": "bedroom",
"value": 18
},
{
"location": "kitchen",
"value": 18
}
]
}
据我所知,使用 ksql 是不可能的,任何人都可以确认吗?
正确,您目前无法在 KSQL 中执行此操作。自 v5.1/2019 年 3 月起,KSQL 可以读取但不能构建嵌套对象:https://github.com/confluentinc/ksql/issues/2147(如果需要,请 upvote/comment)
您可以通过以下方式进行平均计算:
SELECT timestamp, SUM(value)/COUNT(*) AS avg_temp \
FROM input_stream \
GROUP BY timestamp;
我正在尝试使用 ksql 将单个输入流中的多个事件合并为按时间戳分组的单个输出事件。我还希望输出事件包含输入事件的平均值,虽然这不是严格意义上的 nessersay,但更好。
输入流:温度
event1: {location: "hallway", value: 23, property_Id: "123", timestamp: "1551645625878"}
event2: {location: "bedroom", value: 21, property_Id: "123", timestamp: "1551645625878"}
event3: {location: "kitchen", value: 20, property_Id: "123", timestamp: "1551645625878"}
event4: {location: "hallway", value: 19, property_Id: "123", timestamp: "9991645925878"}
event5: {location: "bedroom", value: 18, property_Id: "123", timestamp: "9991645925878"}
event6: {location: "kitchen", value: 18, property_Id: "123", timestamp: "9991645925878"}
(期望)输出流:
event1:
{
"property_id": "123",
"timestamp": "1551645625878",
"average_temperature": 21,
"temperature": [
{
"location": "hallway",
"value": 23
},
{
"location": "bedroom",
"value": 21
},
{
"location": "kitchen",
"value": 20
}
]
}
event2:
{
"property_id": "123",
"timestamp": "9991645925878",
"average_temperature": 18,
"temperature": [
{
"location": "hallway",
"value": 19
},
{
"location": "bedroom",
"value": 18
},
{
"location": "kitchen",
"value": 18
}
]
}
据我所知,使用 ksql 是不可能的,任何人都可以确认吗?
正确,您目前无法在 KSQL 中执行此操作。自 v5.1/2019 年 3 月起,KSQL 可以读取但不能构建嵌套对象:https://github.com/confluentinc/ksql/issues/2147(如果需要,请 upvote/comment)
您可以通过以下方式进行平均计算:
SELECT timestamp, SUM(value)/COUNT(*) AS avg_temp \
FROM input_stream \
GROUP BY timestamp;