关于 spark + cassandra 的性能相关问题(JAVA 代码)

Perfomance related question on spark + cassandra (JAVA code)

我正在使用 cassandra 作为我的垃圾场,我在上面有多个作业 运行 处理数据并更新不同的系统。以下是与工作相关的过滤器

作业1.根据active_flag和update_date_time和expiry_time进行数据过滤,并对过滤后的数据进行处理。

Job 2. 基于update_date_time的数据过滤器处理数据

作业 3. 基于 created_date_time 和活动标志

的数据过滤器

where 条件 运行 所在的数据库列(一个查询中的一个或多个列)

  1. 活跃 -> yes/no
  2. created_date -> 时间戳
  3. expiry_time -> 时间戳
  4. updated_date -> 时间戳

我对这些条件的问题:-

  1. 我应该如何形成我的 Cassandra 主键?因为我看不出有什么方法可以实现唯一性(id 存在,但我不需要处理数据)。

  2. 如果我使用 table 扫描对 spark 代码进行过滤,我还需要主键吗?

考虑处理数百万条记录。

回答你的问题 - 你需要有一个主键,即使它只包含分区键:-)

更详细的答案实际上取决于这些作业的频率 运行、总体数据量、集群中的节点数、使用的硬件等。通常,我们试图推送尽可能多地过滤 Cassandra,因此它将 return 仅相关数据,而不是所有数据。这种过滤最有效的发生在第一个聚类列上,例如,如果我只想处理新创建的条目,那么我可以使用具有以下结构的 table:

create table test.test (
  pk int,
  tm timestamp,
  c2 int,
  v1 int,
  v2 int,
  primary key(pk, tm, c2));

然后我可以使用以下方法仅获取新创建的条目:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("test", "test").load()
val filtered = data.filter("tm >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")

或者我可以获取给定时间段内的条目:

val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp)
  AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

可以通过在数据帧上执行 explain 来检查过滤器下推的效果,并检查 PushedFilters 部分 - 标记为 * 的条件将在 Cassandra 上执行边...

但设计 table 并不总是可以匹配所有查询,因此您需要为最常执行的作业设计主键。在你的情况下, update_date_time 可能是一个很好的候选者,但如果你把它作为聚类列,那么你在更新它时需要小心 - 你需要批量执行更改,比如这个:

begin batch
delete from table where pk = ... and update_date_time = old_timestamp;
insert into table (pk, update_date_time, ...) values (..., new_timestamp, ...);
apply batch;

或类似的东西。