如何确保 spark 不会从 cassandra 读取相同的数据两次

How to ensure spark not read same data twice from cassandra

我正在学习 spark 和 cassandra。我的问题如下。 我有 cassandra table 记录来自传感器的行数据

CREATE TABLE statistics.sensor_row (
    name text,
    date timestamp,
    value int,
    PRIMARY KEY (name, date)
) 

现在我想通过 spark 批处理作业(即每天)聚合这些行

所以我可以写

val rdd = sc.cassandraTable("statistics","sensor_row")
//and do map and reduce to get what i want and perhaps write back to aggregated table. 

但我的问题是我会定期 运行ning 这段代码。我需要确保我不会读取相同的数据两次。

我能做的一件事是删除我读过的行,这些行看起来很丑,或者使用过滤器

sensorRowRDD.where("date >'2016-02-05 07:32:23+0000'")

第二个看起来好多了,但是我需要记录作业 运行 最后一次是什么时候,然后从那里继续。然而,根据 DataStax 驱动数据本地化,每个 worker 只会在其本地 cassandra 节点中加载数据。这意味着我需要跟踪每个 cassandra/spark 节点的日期,而不是跟踪全局日期。看起来还是不太优雅。

有更好的方法吗?

DataFrame 过滤器将被下推到 Cassandra,因此这是解决问题的有效方法。但是你担心一致性问题是对的。

一个解决方案是不仅要设置开始日期,还要设置结束日期。当你的工作开始时,它会看时钟。是2016-02-05 12:00。也许您在收集迟到的数据时有几分钟的延迟,而且时钟也不是绝对精确的。您决定使用 10 分钟的延迟并将结束时间设置为 2016-02-05 11:50。您将其记录在 file/database 中。上一个运行的结束时间是2016-02-04 11:48。所以你的过滤器是 date > '2016-02-04 11:48' and date < '2016-02-05 11:50'.

因为日期范围涵盖了所有时间,所以您只会错过处理完范围后保存到过去范围内的事件。如果这种情况经常发生,您可以将延迟从 10 分钟增加。