基于 cassandra table 分区键将数据保存在 spark 中
Keeping data together in spark based on cassandra table partition key
从 Cassandra table 加载数据时,spark 分区表示具有相同分区键的所有行。但是,当我使用相同的分区键在 spark 中创建数据并使用 .repartitionByCassandraReplica(..) 方法重新分区新的 RDD 时,它最终会出现在不同的 spark 分区中吗?如何使用 Spark-Cassandra 连接器定义的分区方案在 Spark 中实现一致的分区?
Links to download CQL and Spark job code that I tested
Version and other information
- 火花:1.3
- 卡桑德拉:2.1
- 连接器:1.3.1
- Spark 节点(5 个)和 Cass* 集群节点(4 个)运行在不同的数据中心
Code extract. Download code using above links for more details
第 1 步:将数据加载到 8 个 spark 分区
Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));
步骤 2:将数据重新划分为 8 个分区
.repartitionByCassandraReplica(
"testkeyspace",
"testtable",
partitionNumPerHost,
someColumns("id"),
mapToRow(TestTable.class, map));
第 3 步:打印两个 rdds 的分区 ID 和值
rdd.mapPartitionsWithIndex(...{
@Override
public Iterator<String> call(..) throws Exception {
List<String> list = new ArrayList<String>();
list.add("PartitionId-" + integer);
while (itr.hasNext()) {
TestTable value = itr.next();
list.add(Integer.toString(value.getId()));
}
return list.iterator();
}
}, true).collect();
第 4 步:打印在分区 1 上的结果快照。两个 Rdd 不同,但预计相同
加载 Rdd 值
----------------------------
Table load - PartitionId -1
----------------------------
15
22
--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16
Cassandra 副本的重新分区不会确定性地放置密钥。目前有一张票可以改变它。
https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278
现在的解决方法是将 Partitionspernode 参数设置为 1。
从 Cassandra table 加载数据时,spark 分区表示具有相同分区键的所有行。但是,当我使用相同的分区键在 spark 中创建数据并使用 .repartitionByCassandraReplica(..) 方法重新分区新的 RDD 时,它最终会出现在不同的 spark 分区中吗?如何使用 Spark-Cassandra 连接器定义的分区方案在 Spark 中实现一致的分区?
Links to download CQL and Spark job code that I tested
Version and other information
- 火花:1.3
- 卡桑德拉:2.1
- 连接器:1.3.1
- Spark 节点(5 个)和 Cass* 集群节点(4 个)运行在不同的数据中心
Code extract. Download code using above links for more details
第 1 步:将数据加载到 8 个 spark 分区
Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));
步骤 2:将数据重新划分为 8 个分区
.repartitionByCassandraReplica(
"testkeyspace",
"testtable",
partitionNumPerHost,
someColumns("id"),
mapToRow(TestTable.class, map));
第 3 步:打印两个 rdds 的分区 ID 和值
rdd.mapPartitionsWithIndex(...{
@Override
public Iterator<String> call(..) throws Exception {
List<String> list = new ArrayList<String>();
list.add("PartitionId-" + integer);
while (itr.hasNext()) {
TestTable value = itr.next();
list.add(Integer.toString(value.getId()));
}
return list.iterator();
}
}, true).collect();
第 4 步:打印在分区 1 上的结果快照。两个 Rdd 不同,但预计相同
加载 Rdd 值
----------------------------
Table load - PartitionId -1
----------------------------
15
22
--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16
Cassandra 副本的重新分区不会确定性地放置密钥。目前有一张票可以改变它。
https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278
现在的解决方法是将 Partitionspernode 参数设置为 1。