Flink Streaming:从一个 window,在另一个 window 中查找状态
Flink Streaming: From one window, lookup state in another window
我有两个流:
- 测量值
- WhoMeasured(关于谁进行测量的元数据)
这些是类他们的情况:
case class Measurement(var value: Int, var who_measured_id: Int)
case class WhoMeasured(var who_measured_id: Int, var name: String)
Measurement
流有很多数据。 WhoMeasured
流几乎没有。事实上,对于 WhoMeasured
流中的每个 who_measured_id
,只有 1 个名称是相关的,因此如果具有相同 who_measured_id
的元素到达,则可以丢弃旧元素。这本质上是一个由 WhoMeasured
流填充的哈希表。
在我的自定义 window 函数中
class WFunc extends WindowFunction[Measurement, Long, Int, TimeWindow] {
override def apply(key: Int, window: TimeWindow, input: Iterable[Measurement], out: Collector[Long]): Unit = {
// Here I need access to the WhoMeasured stream to get the name of the person who took a measurement
// The following two are equivalent since I keyed by who_measured_id
val name_who_measured = magic(key)
val name_who_measured = magic(input.head.who_measured_id)
}
}
这是我的工作。现在您可能会看到,缺少一些东西:两个流的组合。
val who_measured_stream = who_measured_source
.keyBy(w => w.who_measured_id)
.countWindow(1)
val measurement_stream = measurements_source
.keyBy(m => m.who_measured_id)
.timeWindow(Time.seconds(60), Time.seconds(5))
.apply(new WFunc)
所以本质上这是一种查找 table,当 WhoMeasured
流中的新元素到达时得到更新。
所以问题是:如何实现从一个 WindowedStream
到另一个的查找?
跟进:
按照 Fabian 建议的方式实施后,作业总是因某种序列化问题而失败:
[info] Loading project definition from /home/jgroeger/Code/MeasurementJob/project
[info] Set current project to MeasurementJob (in build file:/home/jgroeger/Code/MeasurementJob/)
[info] Compiling 8 Scala sources to /home/jgroeger/Code/MeasurementJob/target/scala-2.11/classes...
[info] Running de.company.project.Main dev MeasurementJob
[error] Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
[error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[error] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1478)
[error] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
[error] at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:230)
[error] at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:127)
[error] at de.company.project.jobs.MeasurementJob.run(MeasurementJob.scala:139)
[error] at de.company.project.Main$.main(Main.scala:55)
[error] at de.company.project.Main.main(Main.scala)
[error] Caused by: java.io.NotSerializableException: de.company.project.jobs.MeasurementJob
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
[error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[error] ... 7 more
java.lang.RuntimeException: Nonzero exit code returned from runner: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last MeasurementJob/compile:run for the full output.
[error] (MeasurementJob/compile:run) Nonzero exit code returned from runner: 1
[error] Total time: 9 s, completed Nov 15, 2016 2:28:46 PM
Process finished with exit code 1
错误信息:
The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
但是,我的 JoiningCoFlatMap
的唯一字段是建议的 ValueState
。
签名看起来像这样:
class JoiningCoFlatMap extends RichCoFlatMapFunction[Measurement, WhoMeasured, (Measurement, String)] {
我认为您想做的是 window 操作,然后是连接。
您可以使用有状态 CoFlatMapFunction
实现高容量流和低价值按键更新流的连接,如下例所示:
val measures: DataStream[Measurement] = ???
val who: DataStream[WhoMeasured] = ???
val agg: DataStream[(Int, Long)] = measures
.keyBy(_._2) // measured_by_id
.timeWindow(Time.seconds(60), Time.seconds(5))
.apply( (id: Int, w: TimeWindow, v: Iterable[(Int, Int, String)], out: Collector[(Int, Long)]) => {
// do your aggregation
})
val joined: DataStream[(Int, Long, String)] = agg
.keyBy(_._1) // measured_by_id
.connect(who.keyBy(_.who_measured_id))
.flatMap(new JoiningCoFlatMap)
// CoFlatMapFunction
class JoiningCoFlatMap extends RichCoFlatMapFunction[(Int, Long), WhoMeasured, (Int, Long, String)] {
var names: ValueState[String] = null
override def open(conf: Configuration): Unit = {
val stateDescrptr = new ValueStateDescriptor[String](
"whoMeasuredName",
classOf[String],
"" // default value
)
names = getRuntimeContext.getState(stateDescrptr)
}
override def flatMap1(a: (Int, Long), out: Collector[(Int, Long, String)]): Unit = {
// join with state
out.collect( (a._1, a._2, names.value()) )
}
override def flatMap2(w: WhoMeasured, out: Collector[(Int, Long, String)]): Unit = {
// update state
names.update(w.name)
}
}
实现注意事项:CoFlatMapFunction
无法决定要处理的输入,即 flatmap1
和 flatmap2
函数的调用取决于运算符收到的数据。它不能由功能控制。这是初始化状态时的问题。一开始,对于到达的 Measurement
对象,状态可能没有正确的名称,但 return 默认值。您可以通过缓冲测量并将它们加入一次来避免这种情况,来自 who
流的密钥的第一次更新到达。你需要另一个状态。
我有两个流:
- 测量值
- WhoMeasured(关于谁进行测量的元数据)
这些是类他们的情况:
case class Measurement(var value: Int, var who_measured_id: Int)
case class WhoMeasured(var who_measured_id: Int, var name: String)
Measurement
流有很多数据。 WhoMeasured
流几乎没有。事实上,对于 WhoMeasured
流中的每个 who_measured_id
,只有 1 个名称是相关的,因此如果具有相同 who_measured_id
的元素到达,则可以丢弃旧元素。这本质上是一个由 WhoMeasured
流填充的哈希表。
在我的自定义 window 函数中
class WFunc extends WindowFunction[Measurement, Long, Int, TimeWindow] {
override def apply(key: Int, window: TimeWindow, input: Iterable[Measurement], out: Collector[Long]): Unit = {
// Here I need access to the WhoMeasured stream to get the name of the person who took a measurement
// The following two are equivalent since I keyed by who_measured_id
val name_who_measured = magic(key)
val name_who_measured = magic(input.head.who_measured_id)
}
}
这是我的工作。现在您可能会看到,缺少一些东西:两个流的组合。
val who_measured_stream = who_measured_source
.keyBy(w => w.who_measured_id)
.countWindow(1)
val measurement_stream = measurements_source
.keyBy(m => m.who_measured_id)
.timeWindow(Time.seconds(60), Time.seconds(5))
.apply(new WFunc)
所以本质上这是一种查找 table,当 WhoMeasured
流中的新元素到达时得到更新。
所以问题是:如何实现从一个 WindowedStream
到另一个的查找?
跟进:
按照 Fabian 建议的方式实施后,作业总是因某种序列化问题而失败:
[info] Loading project definition from /home/jgroeger/Code/MeasurementJob/project
[info] Set current project to MeasurementJob (in build file:/home/jgroeger/Code/MeasurementJob/)
[info] Compiling 8 Scala sources to /home/jgroeger/Code/MeasurementJob/target/scala-2.11/classes...
[info] Running de.company.project.Main dev MeasurementJob
[error] Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
[error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[error] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1478)
[error] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
[error] at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:230)
[error] at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:127)
[error] at de.company.project.jobs.MeasurementJob.run(MeasurementJob.scala:139)
[error] at de.company.project.Main$.main(Main.scala:55)
[error] at de.company.project.Main.main(Main.scala)
[error] Caused by: java.io.NotSerializableException: de.company.project.jobs.MeasurementJob
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
[error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[error] ... 7 more
java.lang.RuntimeException: Nonzero exit code returned from runner: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last MeasurementJob/compile:run for the full output.
[error] (MeasurementJob/compile:run) Nonzero exit code returned from runner: 1
[error] Total time: 9 s, completed Nov 15, 2016 2:28:46 PM
Process finished with exit code 1
错误信息:
The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
但是,我的 JoiningCoFlatMap
的唯一字段是建议的 ValueState
。
签名看起来像这样:
class JoiningCoFlatMap extends RichCoFlatMapFunction[Measurement, WhoMeasured, (Measurement, String)] {
我认为您想做的是 window 操作,然后是连接。
您可以使用有状态 CoFlatMapFunction
实现高容量流和低价值按键更新流的连接,如下例所示:
val measures: DataStream[Measurement] = ???
val who: DataStream[WhoMeasured] = ???
val agg: DataStream[(Int, Long)] = measures
.keyBy(_._2) // measured_by_id
.timeWindow(Time.seconds(60), Time.seconds(5))
.apply( (id: Int, w: TimeWindow, v: Iterable[(Int, Int, String)], out: Collector[(Int, Long)]) => {
// do your aggregation
})
val joined: DataStream[(Int, Long, String)] = agg
.keyBy(_._1) // measured_by_id
.connect(who.keyBy(_.who_measured_id))
.flatMap(new JoiningCoFlatMap)
// CoFlatMapFunction
class JoiningCoFlatMap extends RichCoFlatMapFunction[(Int, Long), WhoMeasured, (Int, Long, String)] {
var names: ValueState[String] = null
override def open(conf: Configuration): Unit = {
val stateDescrptr = new ValueStateDescriptor[String](
"whoMeasuredName",
classOf[String],
"" // default value
)
names = getRuntimeContext.getState(stateDescrptr)
}
override def flatMap1(a: (Int, Long), out: Collector[(Int, Long, String)]): Unit = {
// join with state
out.collect( (a._1, a._2, names.value()) )
}
override def flatMap2(w: WhoMeasured, out: Collector[(Int, Long, String)]): Unit = {
// update state
names.update(w.name)
}
}
实现注意事项:CoFlatMapFunction
无法决定要处理的输入,即 flatmap1
和 flatmap2
函数的调用取决于运算符收到的数据。它不能由功能控制。这是初始化状态时的问题。一开始,对于到达的 Measurement
对象,状态可能没有正确的名称,但 return 默认值。您可以通过缓冲测量并将它们加入一次来避免这种情况,来自 who
流的密钥的第一次更新到达。你需要另一个状态。