如何使用 flink 流式传输 json?
how to stream a json using flink?
我实际上正在处理流,接收一堆字符串并需要对所有字符串进行计数。总和被加总,这意味着对于第二条记录,总和被添加到前一天
输出必须是一些 json 看起来像
的文件
{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}
我创建了一个看起来像这样的流:
val eventStream : DataStream [String] =
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)
提前感谢您的帮助:)
在 Flink 中使用 JSON 的注意事项:
使用JSONDeserializationSchema
反序列化事件,这将产生ObjectNode
s。为方便起见,您可以将 ObjectNode
映射到 YourObject
或继续使用 ObjectNode
.
使用 ObjectNode
的教程:http://www.baeldung.com/jackson-json-node-tree-model
回到你的案例,你可以这样做:
val eventStream : DataStream [ObjectNode] =
oneMinuteAgg
.addSource(source)
.windowAll()
.TimeWindow(Time.minutes(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
将输出 1 分钟的聚合流
[
{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}
]
然后将另一个运算符链接到 "oneMinuteAgg",将 1 分钟聚合添加到 1 天聚合中:
[...]
oneMinuteAgg
.windowAll()
.TimeWindow(Time.days(1))
.trigger(new Whatever)
.aggregation(new YourDayAggF)
这将输出您需要的内容
{
"aggregationType" : "day"
"days before" : 4
"aggregates : [{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}]
}
我使用 windowAll()
假设您不需要为流设置密钥。
我实际上正在处理流,接收一堆字符串并需要对所有字符串进行计数。总和被加总,这意味着对于第二条记录,总和被添加到前一天 输出必须是一些 json 看起来像
的文件{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}
我创建了一个看起来像这样的流:
val eventStream : DataStream [String] =
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)
提前感谢您的帮助:)
在 Flink 中使用 JSON 的注意事项:
使用JSONDeserializationSchema
反序列化事件,这将产生ObjectNode
s。为方便起见,您可以将 ObjectNode
映射到 YourObject
或继续使用 ObjectNode
.
使用 ObjectNode
的教程:http://www.baeldung.com/jackson-json-node-tree-model
回到你的案例,你可以这样做:
val eventStream : DataStream [ObjectNode] =
oneMinuteAgg
.addSource(source)
.windowAll()
.TimeWindow(Time.minutes(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
将输出 1 分钟的聚合流
[
{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}
]
然后将另一个运算符链接到 "oneMinuteAgg",将 1 分钟聚合添加到 1 天聚合中:
[...]
oneMinuteAgg
.windowAll()
.TimeWindow(Time.days(1))
.trigger(new Whatever)
.aggregation(new YourDayAggF)
这将输出您需要的内容
{
"aggregationType" : "day"
"days before" : 4
"aggregates : [{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}]
}
我使用 windowAll()
假设您不需要为流设置密钥。