Spark Structured Streaming 速率限制
Spark Structured Streaming rate limit
我正在尝试控制结构化流中每个触发器的记录。他们有什么功能吗?我尝试了不同的属性,但似乎没有任何效果。
import org.apache.spark.sql.streaming.Trigger
val checkpointPath = "/user/akash-singh.bisht@unilever.com/dbacademy/developer-foundations-capstone/checkpoint/orders"
// val outputPath = "/user/akash-singh.bisht@unilever.com/dbacademy/developer-foundations-capstone/raw/orders/stream"
val devicesQuery = df.writeStream
.outputMode("append")
.format("delta")
.queryName("orders")
.trigger(Trigger.ProcessingTime("1 second"))
.option("inputRowsPerSecond", 1)
.option("maxFilesPerTrigger", 1)
// .option("checkpointLocation", checkpointPath)
// .start(orders_checkpoint_path)
.option("checkpointLocation",checkpointPath)
.table("orders")
Delta 使用两个选项 maxFilesPerTrigger
和 maxBytesPerTrigger
。您已经使用了第一个,它优先于第二个。每个触发器处理的实际记录数取决于输入文件的大小和其中的记录数,因为 Delta 处理完整的文件,而不是将其分成多个块。
但是这些选项需要在 source Delta table 上指定,而不是像您现在指定的那样在接收器上指定:
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "1")
.load("/delta/events")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.table("orders")
更新,只是为了证明该选项有效。
在目录/Users/user/tmp/abc/
中生成测试数据:
for i in {1..100}; do echo "{\"id\":$i}" > $i.json; done
然后 运行 测试,但使用 foreachBatch
映射哪个文件在其中处理 trigger/batch:
import pyspark.sql.functions as F
df = spark.readStream.format("json").schema("id int") \
.option("maxFilesPerTrigger", "1").load("/Users/user/tmp/abc/")
df2 = df.withColumn("file", F.input_file_name())
def feb(d, e):
d.withColumn("batch", F.lit(e)).write.format("parquet") \
.mode("append").save("2.parquet")
stream = df2.writeStream.outputMode("append").foreachBatch(feb).start()
# wait a minute or so
stream.stop()
bdf = spark.read.parquet("2.parquet")
# check content
>>> bdf.show(5, truncate=False)
+---+----------------------------------+-----+
|id |file |batch|
+---+----------------------------------+-----+
|100|file:///Users/user/tmp/abc/100.json|94 |
|99 |file:///Users/user/tmp/abc/99.json |19 |
|78 |file:///Users/user/tmp/abc/78.json |87 |
|81 |file:///Users/user/tmp/abc/81.json |89 |
|34 |file:///Users/user/tmp/abc/34.json |69 |
+---+----------------------------------+-----+
# check that each file came in a separate batch
>>> bdf.select("batch").dropDuplicates().count()
100
如果我将 maxFilesPerTrigger
增加到 2,那么我将得到 50 个批次,等等
我正在尝试控制结构化流中每个触发器的记录。他们有什么功能吗?我尝试了不同的属性,但似乎没有任何效果。
import org.apache.spark.sql.streaming.Trigger
val checkpointPath = "/user/akash-singh.bisht@unilever.com/dbacademy/developer-foundations-capstone/checkpoint/orders"
// val outputPath = "/user/akash-singh.bisht@unilever.com/dbacademy/developer-foundations-capstone/raw/orders/stream"
val devicesQuery = df.writeStream
.outputMode("append")
.format("delta")
.queryName("orders")
.trigger(Trigger.ProcessingTime("1 second"))
.option("inputRowsPerSecond", 1)
.option("maxFilesPerTrigger", 1)
// .option("checkpointLocation", checkpointPath)
// .start(orders_checkpoint_path)
.option("checkpointLocation",checkpointPath)
.table("orders")
Delta 使用两个选项 maxFilesPerTrigger
和 maxBytesPerTrigger
。您已经使用了第一个,它优先于第二个。每个触发器处理的实际记录数取决于输入文件的大小和其中的记录数,因为 Delta 处理完整的文件,而不是将其分成多个块。
但是这些选项需要在 source Delta table 上指定,而不是像您现在指定的那样在接收器上指定:
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "1")
.load("/delta/events")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.table("orders")
更新,只是为了证明该选项有效。
在目录/Users/user/tmp/abc/
中生成测试数据:
for i in {1..100}; do echo "{\"id\":$i}" > $i.json; done
然后 运行 测试,但使用 foreachBatch
映射哪个文件在其中处理 trigger/batch:
import pyspark.sql.functions as F
df = spark.readStream.format("json").schema("id int") \
.option("maxFilesPerTrigger", "1").load("/Users/user/tmp/abc/")
df2 = df.withColumn("file", F.input_file_name())
def feb(d, e):
d.withColumn("batch", F.lit(e)).write.format("parquet") \
.mode("append").save("2.parquet")
stream = df2.writeStream.outputMode("append").foreachBatch(feb).start()
# wait a minute or so
stream.stop()
bdf = spark.read.parquet("2.parquet")
# check content
>>> bdf.show(5, truncate=False)
+---+----------------------------------+-----+
|id |file |batch|
+---+----------------------------------+-----+
|100|file:///Users/user/tmp/abc/100.json|94 |
|99 |file:///Users/user/tmp/abc/99.json |19 |
|78 |file:///Users/user/tmp/abc/78.json |87 |
|81 |file:///Users/user/tmp/abc/81.json |89 |
|34 |file:///Users/user/tmp/abc/34.json |69 |
+---+----------------------------------+-----+
# check that each file came in a separate batch
>>> bdf.select("batch").dropDuplicates().count()
100
如果我将 maxFilesPerTrigger
增加到 2,那么我将得到 50 个批次,等等