Flink Streaming - 在 windows 中应用函数
Flink Streaming - apply function in windows
我也是 flink 和流媒体的新手。我想将每个分区的某个函数应用于流的每个 window(使用事件时间)。到目前为止我所做的是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("dataset.txt")
.map(transformStream(_))
.assignAscendingTimestamps(_.eventTime)
.keyBy(_.id)
.timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))
def transformStream(input: String): EventStream = {...}
case class EventStream(val eventTime: Long, val id: String, actualEvent: String)
我想做的是对每个 window 批次的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经看到该方法适用于 DataStream API 但我不明白它是如何工作的。在 Flink API 中它说它在 Scala 中是这样使用的:
inputStream.apply { WindowFunction }
谁能解释一下 apply 方法的作用或使用方法? Scala 中的示例将更可取。 apply 方法是否符合我的要求?
就我而言,您可以将 map / flatmap / keyBy 函数调用应用于有状态窗口数据 val inputStream
以更改数据。所以如果你要创建
class DoSthWithYourStream {...}
您可以在其中定义方法和输入数据限制,然后您可以创建另一个值:
val inputStreamChanged = inputStream
.map( a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a)
.flatMap(new DoSthWithYourStream())
Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc
如果你想使用 CEP,那么我认为最好的选择是利用 CEP pattern API
val pattern = Pattern.begin("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(inputStream, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))
事实证明它需要一点 Scala 魔法。到目前为止我所做的是:
val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_))
def processPartition(key: String, window: TimeWindow,
batch: Iterable[EventStream],
out: Collector[Long]): Unit = {..}
根据我的实验,processPartition 方法对 "key partitioned" 的整个批次应用了一个函数(该批次将仅包含具有相同键的元素)。我从 Java API 中获取了此方法的参数。如果有人能详细说明 apply 函数及其工作原理,那将会很有用。
因此,根据您想进行的计算类型,基本上有两个可能的方向可以遵循。使用:fold
/reduce
/aggregate
或更通用的,您已经提到了 -apply
。所有这些都适用于 windows 一个密钥。
至于apply
,这是一种非常通用的应用计算方式。最基本的版本(在 Scala 中)是:
def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]
其中函数有 4 个参数:
- window 的密钥(记住您正在使用 keyedStream)
- window(您可以从中提取 window 的开头或结尾)
- 分配给此特定 window 和键的元素
- 一个收集器,您应该向其发送处理结果
必须记住,这个版本必须保持每个元素的状态,直到发出 window。更好的内存性能解决方案是使用带有 preAgreggator 的版本,它在触发上述函数之前执行一些计算。
在这里您可以看到一个带有预聚合的简短片段:
val stream: DataStream[(String,Int)] = ...
stream.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
.apply((e1, e2) => (e1._1, e1._2 + e2._2),
(key, window, in, out: Collector[(String, Long, Long, Int)]) => {
out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
})
计算会话中某个键的出现次数 windows。
所以基本上,如果您不需要 window 的元信息,我会坚持使用 fold
\ reduce
\ aggregate
如果它们足够的话。比考虑应用某种预聚合,如果这还不够,请查看最通用的 apply
。
更完整的例子你可以看看here.
我也是 flink 和流媒体的新手。我想将每个分区的某个函数应用于流的每个 window(使用事件时间)。到目前为止我所做的是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("dataset.txt")
.map(transformStream(_))
.assignAscendingTimestamps(_.eventTime)
.keyBy(_.id)
.timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))
def transformStream(input: String): EventStream = {...}
case class EventStream(val eventTime: Long, val id: String, actualEvent: String)
我想做的是对每个 window 批次的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经看到该方法适用于 DataStream API 但我不明白它是如何工作的。在 Flink API 中它说它在 Scala 中是这样使用的:
inputStream.apply { WindowFunction }
谁能解释一下 apply 方法的作用或使用方法? Scala 中的示例将更可取。 apply 方法是否符合我的要求?
就我而言,您可以将 map / flatmap / keyBy 函数调用应用于有状态窗口数据 val inputStream
以更改数据。所以如果你要创建
class DoSthWithYourStream {...}
您可以在其中定义方法和输入数据限制,然后您可以创建另一个值:
val inputStreamChanged = inputStream
.map( a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a)
.flatMap(new DoSthWithYourStream())
Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc
如果你想使用 CEP,那么我认为最好的选择是利用 CEP pattern API
val pattern = Pattern.begin("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(inputStream, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))
事实证明它需要一点 Scala 魔法。到目前为止我所做的是:
val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_))
def processPartition(key: String, window: TimeWindow,
batch: Iterable[EventStream],
out: Collector[Long]): Unit = {..}
根据我的实验,processPartition 方法对 "key partitioned" 的整个批次应用了一个函数(该批次将仅包含具有相同键的元素)。我从 Java API 中获取了此方法的参数。如果有人能详细说明 apply 函数及其工作原理,那将会很有用。
因此,根据您想进行的计算类型,基本上有两个可能的方向可以遵循。使用:fold
/reduce
/aggregate
或更通用的,您已经提到了 -apply
。所有这些都适用于 windows 一个密钥。
至于apply
,这是一种非常通用的应用计算方式。最基本的版本(在 Scala 中)是:
def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]
其中函数有 4 个参数:
- window 的密钥(记住您正在使用 keyedStream)
- window(您可以从中提取 window 的开头或结尾)
- 分配给此特定 window 和键的元素
- 一个收集器,您应该向其发送处理结果
必须记住,这个版本必须保持每个元素的状态,直到发出 window。更好的内存性能解决方案是使用带有 preAgreggator 的版本,它在触发上述函数之前执行一些计算。
在这里您可以看到一个带有预聚合的简短片段:
val stream: DataStream[(String,Int)] = ...
stream.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
.apply((e1, e2) => (e1._1, e1._2 + e2._2),
(key, window, in, out: Collector[(String, Long, Long, Int)]) => {
out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
})
计算会话中某个键的出现次数 windows。
所以基本上,如果您不需要 window 的元信息,我会坚持使用 fold
\ reduce
\ aggregate
如果它们足够的话。比考虑应用某种预聚合,如果这还不够,请查看最通用的 apply
。
更完整的例子你可以看看here.