Spark 使用 Java API 的 cassandra 分区
Spark using cassandra partitioning with Java API
我有以下 table:
CREATE TABLE attribute (
pid text,
partner_id int,
key int,
value int,
PRIMARY KEY (pid, partner_id, key)
)
我正在尝试通过分区键 'pid' 来键入我的 RDD。
根据文档,我可以通过这样调用 keyBy 方法来做到这一点:
JavaPairRDD<String, Attribute> attrRdd =
javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
.select(column("pid"), column("partner_id"), column("key"), column("value"))
.keyBy(new Function<Attribute, String>() {
@Override
public String call(Attribute attr) throws Exception {
return attr.getPid();
}
});
但是,这不会创建 cassandra 分区程序。我唯一一次获得 cassandra 分区程序是当我按所有主键列键入时,如下所示:
JavaPairRDD<AttributePK, Attribute> attrRdd =
javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
.select(column("pid"), column("partner_id"), column("key"), column("value"))
.keyBy(JavaApiHelper.getClassTag(AttributePK.class),
mapRowTo(AttributePK.class), mapToRow(AttributePK.class), column("pid"));
AttributePK 将所有 PK 列作为成员:
public class AttributePK {
protected String pid;
protected int partner_id;
protected int key;
...
}
这对我不利,因为最终我想按 'pid' 对所有条目进行分组而不洗牌。
有人知道为什么我无法按照文档中所述仅按分区键列进行键控吗?
谢谢,
夏伊
您在这里使用了两个不同的 api。第一个示例传递函数,第二个示例传递列。当您传递一个函数时,api 无法知道 Key 中的列是什么,因此它无法构建分区程序。尝试对 api.
列使用 "KeyBy" 函数
此外,您可能不需要专门执行此操作,因为 spanBy
api 应该允许您免费对分区键执行 groupByKey/reduceByKey 操作。
我有以下 table:
CREATE TABLE attribute (
pid text,
partner_id int,
key int,
value int,
PRIMARY KEY (pid, partner_id, key)
)
我正在尝试通过分区键 'pid' 来键入我的 RDD。 根据文档,我可以通过这样调用 keyBy 方法来做到这一点:
JavaPairRDD<String, Attribute> attrRdd =
javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
.select(column("pid"), column("partner_id"), column("key"), column("value"))
.keyBy(new Function<Attribute, String>() {
@Override
public String call(Attribute attr) throws Exception {
return attr.getPid();
}
});
但是,这不会创建 cassandra 分区程序。我唯一一次获得 cassandra 分区程序是当我按所有主键列键入时,如下所示:
JavaPairRDD<AttributePK, Attribute> attrRdd =
javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
.select(column("pid"), column("partner_id"), column("key"), column("value"))
.keyBy(JavaApiHelper.getClassTag(AttributePK.class),
mapRowTo(AttributePK.class), mapToRow(AttributePK.class), column("pid"));
AttributePK 将所有 PK 列作为成员:
public class AttributePK {
protected String pid;
protected int partner_id;
protected int key;
...
}
这对我不利,因为最终我想按 'pid' 对所有条目进行分组而不洗牌。
有人知道为什么我无法按照文档中所述仅按分区键列进行键控吗?
谢谢,
夏伊
您在这里使用了两个不同的 api。第一个示例传递函数,第二个示例传递列。当您传递一个函数时,api 无法知道 Key 中的列是什么,因此它无法构建分区程序。尝试对 api.
列使用 "KeyBy" 函数此外,您可能不需要专门执行此操作,因为 spanBy
api 应该允许您免费对分区键执行 groupByKey/reduceByKey 操作。