apache flink 事务聚合
apache flink aggregation of transaction
一直在琢磨怎么写一个接收事件的flink程序,从kafka的3个topic中总结出今天、昨天、前天的事件。
所以第一个问题是,我怎样才能对 3 天不同的交易进行汇总并将它们提取为 json 文件
如果你想从 3 个不同的 kafka 主题或分区中读取,你必须创建 3 个 kafka 源
Flink's documentation about kafka consumer
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer0 = new FlinkKafkaConsumer08[String](...)
val consumer1 = new FlinkKafkaConsumer08[String](...)
val consumer2 = new FlinkKafkaConsumer08[String](...)
consumer0.setStartFromGroupOffsets()
consumer1.setStartFromGroupOffsets()
consumer2.setStartFromGroupOffsets()
val stream0 = env.addSource(consumer0)
val stream1 = env.addSource(consumer1)
val stream2 = env.addSource(consumer2)
val unitedStream = stream0.union(stream1,stream2)
/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/
val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day
val reducedStream = unitedStream
.keyBy("transactionId") // or any field that groups transactions in the same group
.timeWindow(Time.days(windowSize),Time.days(windowStep))
.map(transaction => {
transaction.numberOfTransactions = 1
transaction
}).sum("numberOfTransactions");
val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson)
// you can use a library like GSON for this
// or a scala string template
streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)
如果您的主题名称可以与正则表达式匹配,您可以只创建一个 kafka 消费者,如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("day-[1-3]"),
..., //check documentation to know how to fill this field
...) //check documentation to know how to fill this field
val stream = env.addSource(consumer)
最常见的方法是让所有事务都在同一个kafka主题中,而不是在不同的主题中,在这种情况下,代码会更简单,因为你只需要使用window来处理你的数据
Day 1 -> 11111 -\
Day 2 -> 22222 --> 1111122222333 -> Window -> 11111 22222 333 -> reduce operation per window partition
Day 3 -> 3333 --/ |-----|-----|---|
示例代码
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](...)
consumer.setStartFromGroupOffsets()
val stream = env.addSource(consumer)
/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/
val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day
val reducedStream = stream
.keyBy("transactionId") // or any field that groups transactions in the same group
.timeWindow(Time.days(windowSize),Time.days(windowStep))
.map(transaction => {
transaction.numberOfTransactions = 1
transaction
}).sum("numberOfTransactions");
val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson)
// you can use a library like GSON for this
// or a scala string template
streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)
一直在琢磨怎么写一个接收事件的flink程序,从kafka的3个topic中总结出今天、昨天、前天的事件。
所以第一个问题是,我怎样才能对 3 天不同的交易进行汇总并将它们提取为 json 文件
如果你想从 3 个不同的 kafka 主题或分区中读取,你必须创建 3 个 kafka 源
Flink's documentation about kafka consumer
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer0 = new FlinkKafkaConsumer08[String](...)
val consumer1 = new FlinkKafkaConsumer08[String](...)
val consumer2 = new FlinkKafkaConsumer08[String](...)
consumer0.setStartFromGroupOffsets()
consumer1.setStartFromGroupOffsets()
consumer2.setStartFromGroupOffsets()
val stream0 = env.addSource(consumer0)
val stream1 = env.addSource(consumer1)
val stream2 = env.addSource(consumer2)
val unitedStream = stream0.union(stream1,stream2)
/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/
val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day
val reducedStream = unitedStream
.keyBy("transactionId") // or any field that groups transactions in the same group
.timeWindow(Time.days(windowSize),Time.days(windowStep))
.map(transaction => {
transaction.numberOfTransactions = 1
transaction
}).sum("numberOfTransactions");
val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson)
// you can use a library like GSON for this
// or a scala string template
streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)
如果您的主题名称可以与正则表达式匹配,您可以只创建一个 kafka 消费者,如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("day-[1-3]"),
..., //check documentation to know how to fill this field
...) //check documentation to know how to fill this field
val stream = env.addSource(consumer)
最常见的方法是让所有事务都在同一个kafka主题中,而不是在不同的主题中,在这种情况下,代码会更简单,因为你只需要使用window来处理你的数据
Day 1 -> 11111 -\
Day 2 -> 22222 --> 1111122222333 -> Window -> 11111 22222 333 -> reduce operation per window partition
Day 3 -> 3333 --/ |-----|-----|---|
示例代码
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](...)
consumer.setStartFromGroupOffsets()
val stream = env.addSource(consumer)
/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/
val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day
val reducedStream = stream
.keyBy("transactionId") // or any field that groups transactions in the same group
.timeWindow(Time.days(windowSize),Time.days(windowStep))
.map(transaction => {
transaction.numberOfTransactions = 1
transaction
}).sum("numberOfTransactions");
val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson)
// you can use a library like GSON for this
// or a scala string template
streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)