是否可以在一个 Spark worker 中读取 Cassandra 分区的所有行?
Is it possible to read all rows of Cassandra partition in one Spark worker?
我正在尝试通过最小化它执行的 reduceByKey
步数,以更少的阶段将 Zipkin Dependencies Spark job 优化为 运行。从下面读取数据table:
CREATE TABLE IF NOT EXISTS zipkin.traces (
trace_id bigint,
ts timestamp,
span_name text,
span blob,
PRIMARY KEY (trace_id, ts, span_name)
)
在那里,单个分区 trace_id
包含完整的跟踪,并且包含从几行到几百行不等的任何地方。但是,整个分区由 Spark 作业转换为非常简单的 RDD[((String, String), Long)]
,将条目数从数十亿减少到几百。
不幸的是,当前代码是通过
独立读取所有行来完成的
sc.cassandraTable(keyspace, "traces")
并使用两个 reduceByKey
步骤得出 RDD[((String, String), Long)]
。如果有一种方法可以在一个 Spark 工作进程中一次性读取整个分区,并在内存中处理所有内容,那将是一个巨大的速度提升,无需 store/stream 庞大的数据集当前的第一阶段。
-- 编辑--
澄清一下,该作业必须从 table、数十亿个分区中读取 所有数据。
在不进行随机播放的情况下将所有分区数据保存在同一个 spark worker 上的关键是使用 spanByKey
CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));
sc.cassandraTable("test", "events")
.spanBy(row => (row.getInt("year"), row.getInt("month")))
sc.cassandraTable("test", "events")
.keyBy(row => (row.getInt("year"), row.getInt("month")))
.spanByKey
如果没有随机播放,那么所有修改都将就地完成并作为迭代器一起流水线化。
请务必注意警告:
Note: This only works for sequentially ordered data. Because data is
ordered in Cassandra by the clustering keys, all viable spans must
follow the natural clustering key order.
我正在尝试通过最小化它执行的 reduceByKey
步数,以更少的阶段将 Zipkin Dependencies Spark job 优化为 运行。从下面读取数据table:
CREATE TABLE IF NOT EXISTS zipkin.traces (
trace_id bigint,
ts timestamp,
span_name text,
span blob,
PRIMARY KEY (trace_id, ts, span_name)
)
在那里,单个分区 trace_id
包含完整的跟踪,并且包含从几行到几百行不等的任何地方。但是,整个分区由 Spark 作业转换为非常简单的 RDD[((String, String), Long)]
,将条目数从数十亿减少到几百。
不幸的是,当前代码是通过
独立读取所有行来完成的sc.cassandraTable(keyspace, "traces")
并使用两个 reduceByKey
步骤得出 RDD[((String, String), Long)]
。如果有一种方法可以在一个 Spark 工作进程中一次性读取整个分区,并在内存中处理所有内容,那将是一个巨大的速度提升,无需 store/stream 庞大的数据集当前的第一阶段。
-- 编辑--
澄清一下,该作业必须从 table、数十亿个分区中读取 所有数据。
在不进行随机播放的情况下将所有分区数据保存在同一个 spark worker 上的关键是使用 spanByKey
CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));
sc.cassandraTable("test", "events")
.spanBy(row => (row.getInt("year"), row.getInt("month")))
sc.cassandraTable("test", "events")
.keyBy(row => (row.getInt("year"), row.getInt("month")))
.spanByKey
如果没有随机播放,那么所有修改都将就地完成并作为迭代器一起流水线化。
请务必注意警告:
Note: This only works for sequentially ordered data. Because data is ordered in Cassandra by the clustering keys, all viable spans must follow the natural clustering key order.