在 flink 的流映射中更新并发映射
Update concurrent map inside a stream map on flink
我有一个流不断流式传输某些键的最新值。
流 A:DataStream[(String,Double)]
我有另一个流想要获取每个进程调用的最新值。
我的方法是引入一个 concurrentHashMap
,它将由流 A 更新并由第二个流读取。
val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str) // rates does not contain the values of the streamA at this point
//some other functionality
)
是否可以从流更新并发映射?与另一个流共享数据的任何其他解决方案也是可以接受的
您尝试使用的行为不会以分布式方式工作,基本上如果您有 parellelism
> 1 它将不会工作。在您的代码中 rates
实际上已更新,但在并行运算符的不同实例中。
实际上,在这种情况下,您想要做的是使用 BroadcastState
,它旨在准确解决您面临的问题。
在您的特定用例中,它看起来像这样:
val streamA : DataStream[(String,Double)]= ???
val streamABroadcasted = streamA.broadcast(<Your Map State Definition>)
val streamB: DataStream[String] = ???
streamB.connect(streamABroadcasted)
然后您可以轻松地使用 BroadcastProcessFunction
来实现您的逻辑。有关广播状态模式的更多信息,请参见 here
我有一个流不断流式传输某些键的最新值。
流 A:DataStream[(String,Double)]
我有另一个流想要获取每个进程调用的最新值。
我的方法是引入一个 concurrentHashMap
,它将由流 A 更新并由第二个流读取。
val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str) // rates does not contain the values of the streamA at this point
//some other functionality
)
是否可以从流更新并发映射?与另一个流共享数据的任何其他解决方案也是可以接受的
您尝试使用的行为不会以分布式方式工作,基本上如果您有 parellelism
> 1 它将不会工作。在您的代码中 rates
实际上已更新,但在并行运算符的不同实例中。
实际上,在这种情况下,您想要做的是使用 BroadcastState
,它旨在准确解决您面临的问题。
在您的特定用例中,它看起来像这样:
val streamA : DataStream[(String,Double)]= ???
val streamABroadcasted = streamA.broadcast(<Your Map State Definition>)
val streamB: DataStream[String] = ???
streamB.connect(streamABroadcasted)
然后您可以轻松地使用 BroadcastProcessFunction
来实现您的逻辑。有关广播状态模式的更多信息,请参见 here