使用 mapWithState Spark Streaming 过滤部分重复项
Filter partial duplicates with mapWithState Spark Streaming
我们有一个DStream,比如
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
所以每个 RDD 都有一堆 TMsg
对象,其中包含一些(技术)关键字段,我可以用它来去重复 DStream。基本上,如果我们在一个或两个离散的 RDDs 中有两个具有相同 field1
和 field2
的 TMsg 对象,并且它们相差不到 1 秒(我们看startTimeSeconds
),这是一个 重复。
我查看了 mapWithState。
是的,我可以像
这样创建 K -> V DStream
val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
所以我可以使用该功能,但不明白如何使用它来过滤重复项。
Window 函数无济于事,我不能使用 (structured stream).deduplicate 函数,因为解决方案是用 DStreams 编写的。
有什么解决办法吗?谢谢
P.S。 Spark 版本为 2.2
您可以使用 mapWithState
. There is a good manual how to use Stateful Streaming。
在您的情况下,您可以:
1.Set 检查点:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")
2.Define更新函数:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
# you can update your state in any value you want
# it is just a marker that value not new
state.update(value.get)
Option((key, v))
case (_, _) if state.isTimingOut() => None
}
}
3.Make 状态规范:
val stateSpec =
StateSpec
.function(update _)
# it is important to define how long
# you want to check duplication
# in this example check interval is 1 second.
.timeout(Seconds(1))
4.Use它:
ks
# make key->value pairs
.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
.mapWithState(stateSpec)
如果您想取最后一个值,更新函数可能是:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
state.update(value.get)
None
case (_, _) if state.isTimingOut() => Option((key, value.get))
}
}
我们有一个DStream,比如
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
所以每个 RDD 都有一堆 TMsg
对象,其中包含一些(技术)关键字段,我可以用它来去重复 DStream。基本上,如果我们在一个或两个离散的 RDDs 中有两个具有相同 field1
和 field2
的 TMsg 对象,并且它们相差不到 1 秒(我们看startTimeSeconds
),这是一个 重复。
我查看了 mapWithState。 是的,我可以像
这样创建 K -> V DStreamval mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
所以我可以使用该功能,但不明白如何使用它来过滤重复项。
Window 函数无济于事,我不能使用 (structured stream).deduplicate 函数,因为解决方案是用 DStreams 编写的。
有什么解决办法吗?谢谢
P.S。 Spark 版本为 2.2
您可以使用 mapWithState
. There is a good manual how to use Stateful Streaming。
在您的情况下,您可以:
1.Set 检查点:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")
2.Define更新函数:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
# you can update your state in any value you want
# it is just a marker that value not new
state.update(value.get)
Option((key, v))
case (_, _) if state.isTimingOut() => None
}
}
3.Make 状态规范:
val stateSpec =
StateSpec
.function(update _)
# it is important to define how long
# you want to check duplication
# in this example check interval is 1 second.
.timeout(Seconds(1))
4.Use它:
ks
# make key->value pairs
.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
.mapWithState(stateSpec)
如果您想取最后一个值,更新函数可能是:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
state.update(value.get)
None
case (_, _) if state.isTimingOut() => Option((key, value.get))
}
}