结构化流式传输性能和清除 parquet 文件
Structured streaming performance and purging the parquet files
我正在使用 Spark 结构化流从 Kafka 获取流数据。我需要汇总各种指标(比如 6 个指标)并编写为镶木地板文件。我确实看到指标 1 和指标 2 之间存在巨大的延迟。例如,如果最近更新了指标 1,则指标 2 是一小时前的数据。如何提高此性能以并行工作?
此外,我编写了应该由另一个应用程序读取的 Parquet 文件。如何不断清除旧的镶木地板信息?我应该有不同的申请吗?
Dataset<String> lines_topic = spark.readStream().format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
Dataset<Row> data= lines_topic.select(functions.from_json(lines_topic.col("value"), schema).alias(topics)); data.withWatermark(---).groupBy(----).count(); query = data.writeStream().format("parquet").option("path",---).option("truncate", "false").outputMode("append").option("checkpointLocation", checkpointFile).start();
由于每个查询 运行 独立于其他查询,您需要确保为每个查询提供足够的资源来执行。可能发生的情况是,如果您使用默认 FIFO scheduler,那么所有触发器都是 运行 顺序与并行。
正如在您的 SparkContext 中描述的那样 here you should set a FAIR scheduler,然后为每个查询定义新池。
// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)
// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)
此外,在清除旧的 parquet 文件方面,您可能希望对数据进行分区,然后根据需要定期删除旧分区。否则,如果所有数据都写入同一输出路径,则不能只删除行。
我正在使用 Spark 结构化流从 Kafka 获取流数据。我需要汇总各种指标(比如 6 个指标)并编写为镶木地板文件。我确实看到指标 1 和指标 2 之间存在巨大的延迟。例如,如果最近更新了指标 1,则指标 2 是一小时前的数据。如何提高此性能以并行工作?
此外,我编写了应该由另一个应用程序读取的 Parquet 文件。如何不断清除旧的镶木地板信息?我应该有不同的申请吗?
Dataset<String> lines_topic = spark.readStream().format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
Dataset<Row> data= lines_topic.select(functions.from_json(lines_topic.col("value"), schema).alias(topics)); data.withWatermark(---).groupBy(----).count(); query = data.writeStream().format("parquet").option("path",---).option("truncate", "false").outputMode("append").option("checkpointLocation", checkpointFile).start();
由于每个查询 运行 独立于其他查询,您需要确保为每个查询提供足够的资源来执行。可能发生的情况是,如果您使用默认 FIFO scheduler,那么所有触发器都是 运行 顺序与并行。
正如在您的 SparkContext 中描述的那样 here you should set a FAIR scheduler,然后为每个查询定义新池。
// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)
// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)
此外,在清除旧的 parquet 文件方面,您可能希望对数据进行分区,然后根据需要定期删除旧分区。否则,如果所有数据都写入同一输出路径,则不能只删除行。