是否可以让两个 Spark 进程同时读取一个 Delta Table 中的流?
Is it possible to have two Spark process reading a stream in one Delta Table at the same time?
我在一个 Delta Table 中接收数据,我想让两个消费者处理读取数据:
我在不同的罐子里有这些代码:
1) 用于实时计算聚合的 Spark 进程。
val df_aggregations = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_aggregation_checkpoint)
.load(configuration.delta_table)
2) 实时获取新值的Spark进程。
val df_news = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_news_checkpoint)
.load(configuration.delta_table)
我的问题是我只看到其中一个流程运行良好,我的意思是如果我 运行 流程 1) 首先比 2) 我看到流程 1) 的良好结果,但是,我看不到过程 2) 的结果,如果我先 运行 过程 2),那么我会看到过程 2) 的结果,但看不到过程 1) 结果。
是的,您可以让 N 个消费者进程读取同一个增量 Table,只是您需要为每个进程设置不同的检查点文件。
我在一个 Delta Table 中接收数据,我想让两个消费者处理读取数据:
我在不同的罐子里有这些代码:
1) 用于实时计算聚合的 Spark 进程。
val df_aggregations = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_aggregation_checkpoint)
.load(configuration.delta_table)
2) 实时获取新值的Spark进程。
val df_news = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_news_checkpoint)
.load(configuration.delta_table)
我的问题是我只看到其中一个流程运行良好,我的意思是如果我 运行 流程 1) 首先比 2) 我看到流程 1) 的良好结果,但是,我看不到过程 2) 的结果,如果我先 运行 过程 2),那么我会看到过程 2) 的结果,但看不到过程 1) 结果。
是的,您可以让 N 个消费者进程读取同一个增量 Table,只是您需要为每个进程设置不同的检查点文件。