如何使用 flink 流式传输 json?

how to stream a json using flink?

我实际上正在处理流,接收一堆字符串并需要对所有字符串进行计数。总和被加总,这意味着对于第二条记录,总和被添加到前一天 输出必须是一些 json 看起来像

的文件
{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

我创建了一个看起来像这样的流:

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

提前感谢您的帮助:)

在 Flink 中使用 JSON 的注意事项:

使用JSONDeserializationSchema反序列化事件,这将产生ObjectNodes。为方便起见,您可以将 ObjectNode 映射到 YourObject 或继续使用 ObjectNode.

使用 ObjectNode 的教程:http://www.baeldung.com/jackson-json-node-tree-model

回到你的案例,你可以这样做:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
    .addSource(source)
    .windowAll()
    .TimeWindow(Time.minutes(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)

将输出 1 分钟的聚合流

[     
      {
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }
]

然后将另一个运算符链接到 "oneMinuteAgg",将 1 分钟聚合添加到 1 天聚合中:

[...]
oneMinuteAgg
        .windowAll()
        .TimeWindow(Time.days(1))
        .trigger(new Whatever)
        .aggregation(new YourDayAggF)

这将输出您需要的内容

{
    "aggregationType" : "day"
    "days before" : 4
    "aggregates : [{
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }]
}

我使用 windowAll() 假设您不需要为流设置密钥。