从 Spark 中的 cassandra Table 中删除

Delete from cassandra Table in Spark

我正在将 Spark 与 cassandra 结合使用。我正在从我的 table 中读取一些行,以便使用 PrimaryKey 删除主题。这是我的代码:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

但是这种方法为每一行创建一个会话并且需要很多时间。那么是否可以使用 sc.CassandraTable 或比我的更好的其他解决方案删除我的行。

谢谢

我认为 Cassandra 连接器目前不支持 delete。为了分摊连接设置的成本,推荐的方法是将操作应用于每个分区。

因此您的代码将如下所示:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})

您也可以考虑使用 DELETE FROM ... WHERE pk IN (list) 并使用类似的方法为每个分区构建 list。这将更加高效,但可能会破坏非常大的分区,因为列表将因此变得很长。在应用这个函数之前重新分区你的目标 RDD 会有所帮助。

您很久以前就问过这个问题,所以您可能已经找到了答案。 :P 只是为了分享,这是我在 Java 中所做的。这段代码非常适合我的本地 Cassandra 实例。但它不适用于我们的 BETA 或 PRODUCTION 实例,因为 我怀疑 那里有多个 Cassandra 数据库实例,而删除仅对 1 个实例有效,并且数据立即被复制回来。 :(

如果您能够让它在您的 Cassandra 生产环境中工作,请分享它的多个实例 运行!

public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    myData.foreachPartition(partition -> {
        Session session = connector.openSession();

        while(partition.hasNext()) {
            Row row = (Row) partition.next();
            boolean isTested = (boolean) row.get(0);
            String product = (String) row.get(1);
            long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
            String id = (String) row.get(3);

            String deleteMyData = "DELETE FROM test.my_table"
                    + " WHERE is_tested=" + isTested
                    + " AND product='" + product + "'"
                    + " AND report_date=" + reportDateInMillSeconds
                    + " AND id=" + id + ";";

            System.out.println("%%% " + deleteMyData);
            ResultSet deleteResult = session.execute(deleteMyData);
            boolean result = deleteResult.wasApplied();
            System.out.println("%%% deleteResult =" + result);
        }
        session.close();
    });
}