Spark 使用 Java API 的 cassandra 分区

Spark using cassandra partitioning with Java API

我有以下 table:

CREATE TABLE attribute (
    pid text,
    partner_id int,
    key int,
    value int,
    PRIMARY KEY (pid, partner_id, key)
)

我正在尝试通过分区键 'pid' 来键入我的 RDD。 根据文档,我可以通过这样调用 keyBy 方法来做到这一点:

JavaPairRDD<String, Attribute> attrRdd =
    javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
    .select(column("pid"), column("partner_id"), column("key"), column("value"))
    .keyBy(new Function<Attribute, String>() {
        @Override
        public String call(Attribute attr) throws Exception {
            return attr.getPid();
        }
    });

但是,这不会创建 cassandra 分区程序。我唯一一次获得 cassandra 分区程序是当我按所有主键列键入时,如下所示:

JavaPairRDD<AttributePK, Attribute> attrRdd =
    javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
    .select(column("pid"), column("partner_id"), column("key"), column("value"))
    .keyBy(JavaApiHelper.getClassTag(AttributePK.class),
        mapRowTo(AttributePK.class), mapToRow(AttributePK.class), column("pid"));

AttributePK 将所有 PK 列作为成员:

public class AttributePK {
    protected String pid;
    protected int partner_id;
    protected int key;
    ...
}

这对我不利,因为最终我想按 'pid' 对所有条目进行分组而不洗牌。

有人知道为什么我无法按照文档中所述仅按分区键列进行键控吗?

谢谢,

夏伊

您在这里使用了两个不同的 api。第一个示例传递函数,第二个示例传递列。当您传递一个函数时,api 无法知道 Key 中的列是什么,因此它无法构建分区程序。尝试对 api.

列使用 "KeyBy" 函数

此外,您可能不需要专门执行此操作,因为 spanBy api 应该允许您免费对分区键执行 groupByKey/reduceByKey 操作。