从 Spark 中的 cassandra Table 中删除
Delete from cassandra Table in Spark
我正在将 Spark 与 cassandra 结合使用。我正在从我的 table 中读取一些行,以便使用 PrimaryKey 删除主题。这是我的代码:
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
但是这种方法为每一行创建一个会话并且需要很多时间。那么是否可以使用 sc.CassandraTable 或比我的更好的其他解决方案删除我的行。
谢谢
我认为 Cassandra 连接器目前不支持 delete
。为了分摊连接设置的成本,推荐的方法是将操作应用于每个分区。
因此您的代码将如下所示:
lines.foreachPartition(partition => {
val session: Session = connector.openSession //once per partition
partition.foreach{elem =>
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
session.execute(delete)
}
session.close()
})
您也可以考虑使用 DELETE FROM ... WHERE pk IN (list)
并使用类似的方法为每个分区构建 list
。这将更加高效,但可能会破坏非常大的分区,因为列表将因此变得很长。在应用这个函数之前重新分区你的目标 RDD 会有所帮助。
您很久以前就问过这个问题,所以您可能已经找到了答案。 :P 只是为了分享,这是我在 Java 中所做的。这段代码非常适合我的本地 Cassandra 实例。但它不适用于我们的 BETA 或 PRODUCTION 实例,因为 我怀疑 那里有多个 Cassandra 数据库实例,而删除仅对 1 个实例有效,并且数据立即被复制回来。 :(
如果您能够让它在您的 Cassandra 生产环境中工作,请分享它的多个实例 运行!
public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
CassandraConnector connector = CassandraConnector.apply(sparkConf);
myData.foreachPartition(partition -> {
Session session = connector.openSession();
while(partition.hasNext()) {
Row row = (Row) partition.next();
boolean isTested = (boolean) row.get(0);
String product = (String) row.get(1);
long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
String id = (String) row.get(3);
String deleteMyData = "DELETE FROM test.my_table"
+ " WHERE is_tested=" + isTested
+ " AND product='" + product + "'"
+ " AND report_date=" + reportDateInMillSeconds
+ " AND id=" + id + ";";
System.out.println("%%% " + deleteMyData);
ResultSet deleteResult = session.execute(deleteMyData);
boolean result = deleteResult.wasApplied();
System.out.println("%%% deleteResult =" + result);
}
session.close();
});
}
我正在将 Spark 与 cassandra 结合使用。我正在从我的 table 中读取一些行,以便使用 PrimaryKey 删除主题。这是我的代码:
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
但是这种方法为每一行创建一个会话并且需要很多时间。那么是否可以使用 sc.CassandraTable 或比我的更好的其他解决方案删除我的行。
谢谢
我认为 Cassandra 连接器目前不支持 delete
。为了分摊连接设置的成本,推荐的方法是将操作应用于每个分区。
因此您的代码将如下所示:
lines.foreachPartition(partition => {
val session: Session = connector.openSession //once per partition
partition.foreach{elem =>
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
session.execute(delete)
}
session.close()
})
您也可以考虑使用 DELETE FROM ... WHERE pk IN (list)
并使用类似的方法为每个分区构建 list
。这将更加高效,但可能会破坏非常大的分区,因为列表将因此变得很长。在应用这个函数之前重新分区你的目标 RDD 会有所帮助。
您很久以前就问过这个问题,所以您可能已经找到了答案。 :P 只是为了分享,这是我在 Java 中所做的。这段代码非常适合我的本地 Cassandra 实例。但它不适用于我们的 BETA 或 PRODUCTION 实例,因为 我怀疑 那里有多个 Cassandra 数据库实例,而删除仅对 1 个实例有效,并且数据立即被复制回来。 :(
如果您能够让它在您的 Cassandra 生产环境中工作,请分享它的多个实例 运行!
public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
CassandraConnector connector = CassandraConnector.apply(sparkConf);
myData.foreachPartition(partition -> {
Session session = connector.openSession();
while(partition.hasNext()) {
Row row = (Row) partition.next();
boolean isTested = (boolean) row.get(0);
String product = (String) row.get(1);
long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
String id = (String) row.get(3);
String deleteMyData = "DELETE FROM test.my_table"
+ " WHERE is_tested=" + isTested
+ " AND product='" + product + "'"
+ " AND report_date=" + reportDateInMillSeconds
+ " AND id=" + id + ";";
System.out.println("%%% " + deleteMyData);
ResultSet deleteResult = session.execute(deleteMyData);
boolean result = deleteResult.wasApplied();
System.out.println("%%% deleteResult =" + result);
}
session.close();
});
}