Unbounded table 是spark结构化流

Unbounded table is spark structured streaming

我开始学习 Spark,但很难理解 Spark 中结构化流背后的合理性。结构化流将所有到达的数据视为无界输入 table,其中数据流中的每个新项目都被视为 table 中的新行。我有以下代码将传入文件读入 csvFolder

val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")

val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")

val query = streamingDF.writeStream
  .format("console")
  .start()

如果我将一个 1GB 的文件转储到文件夹中会发生什么。根据规范,流作业每隔几毫秒触发一次。如果 Spark 在下一瞬间遇到这么大的文件,它会不会在尝试加载文件时 运行 内存不足。还是自动批处理?如果是,这个批处理参数是否可以配置?

example

关键思想是将任何数据流视为无界 table:添加到流中的新记录就像添加到 table 中的行一样。 这允许我们将批处理和流数据都视为 tables。由于 tables 和 DataFrames/Datasets 在语义上是同义词,因此可以将相同的类似批处理的 DataFrame/Dataset 查询应用于批处理数据和流数据。

在结构化流模型中,这是执行此查询的方式。

Question : If Spark encounters such a huge file in the next instant, won't it run out of memory while trying to load the file. Or does it automatically batch it? If yes, is this batching parameter configurable?

Answer : There is no point of OOM since it is RDD(DF/DS)lazily initialized. of course you need to re-partition before processing to ensure equal number of partitions and data spread across executors uniformly...