Spark Structured Streaming 中的 LocalTableScan 有什么用?

What is LocalTableScan in Spark Structured Streaming for?

有谁知道 Spark Structured Streaming 中的 LocalTableScan 对应什么?

我试图了解我在本地 [*] 模式下 运行 的 Spark 结构流应用程序中观察到的奇怪行为。

我的机器上有 8 个内核。虽然我的大部分批次都有 8 个分区,但偶尔我会得到 16、32 或 56 等等 partitions/Tasks。我注意到它总是 8 的倍数。我在打开阶段选项卡时注意到,当它发生时,那是因为有多个 LocalTableScan。

也就是说,如果我有 2 个 LocalTableScan,那么小批量作业将有 16 个 task/partition,依此类推。

为了提供一些上下文,因为我怀疑它可能来自它,我正在使用 MemoryStream。

val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))

我有一个未来可以满足我的记忆流:

Future {
    blocking {
      for (i <- 1 to 100000) {
        rows.addData(maps)
        Thread.sleep(3000)
      }
    }
  }

然后是我的查询:

rdf.writeStream.
    trigger(Trigger.ProcessingTime("1 seconds"))
    .format("console").outputMode("append")
    .queryName("SourceConvertor1").start().awaitTermination()

拜托,有什么建议吗?提示 ?

在Driver的内存中表示。如您的代码所示。