Flink 应用程序的垃圾收集

Garbage Collection on Flink Applications

我在 Scala 中有一个非常简单的 Flink 应用程序。我有 2 个简单的流。我正在将我的一个流广播到另一个流。广播流包含规则,只是检查另一个流的元组是否在规则内。一切正常,我的代码如下所示。

这是一个无限的运行应用程序。我想知道 JVM 是否有可能将我的 rules 对象作为垃圾收集。

有人知道吗?非常感谢。

object StreamBroadcasting extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  val stream = env
    .socketTextStream("localhost", 9998)
    .flatMap(_.toLowerCase.split("\W+").filter(_.nonEmpty))
    .keyBy(l => l)

  val ruleStream = env
    .socketTextStream("localhost", 9999)
    .flatMap(_.toLowerCase.split("\W+").filter(_.nonEmpty))

  val broadcastStream: DataStream[String] = ruleStream.broadcast

  stream.connect(broadcastStream)
    .flatMap(new SimpleConnect)
    .print

  class SimpleConnect extends RichCoFlatMapFunction[String, String, (String, Boolean)] {
    private var rules: Set[String] = Set.empty[String] // Can JVM collect this object after a long time?

    override def open(parameters: Configuration): Unit = {}

    override def flatMap1(value: String, out: Collector[(String, Boolean)]): Unit = {
      out.collect(value, rules.contains(value))
    }

    override def flatMap2(value: String, out: Collector[(String, Boolean)]): Unit = {
      rules = rules.+(value)
    }
  }

  env.execute("flink-broadcast-streams")
}

不,规则集不会被垃圾回收。它会永远存在。 (当然,因为你没有使用 Flink 的广播状态,所以规则不会在应用程序重启后继续存在。)