spark 中 cassandra 行的写入时间

writetime of cassandra row in spark

我正在将 spark 与 cassandra 一起使用,我想从我的 cassandra select table 我的行的 writeTime。这是我的要求:

   val lines = sc.cassandraTable[(String, String, String, Long)](CASSANDRA_SCHEMA, table).select("a", "b", "c", "writeTime(d)").count()

但它显示此错误:

java.io.IOException: Column channal not found in table test.mytable

我也试过这个请求

   val lines = sc.cassandraTable[(String, String, String, Long)](CASSANDRA_SCHEMA, table).select("a", "b", "c", WRITETIME("d")").count()

但它显示此错误:

<console>:25: error: not found: value WRITETIME

请问我怎样才能得到我的行的写入时间。 谢谢

编辑:这已在连接器的 1.2 版本中修复

当前,连接器不支持在从 Cassandra 读取时通过 CQL 函数。我已注意到这一点,并将启动一个用于实现此功能的工单。

https://datastax-oss.atlassian.net/browse/SPARKC-55

对于解决方法,您始终可以在操作中使用直接连接器,如

import com.datastax.spark.connector.cql.CassandraConnector

val cc = CassandraConnector(sc.getConf)
val select = s"SELECT WRITETIME(userId) FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
      cc.withSessionDo(session =>
        session.execute(select, id.toInt: java.lang.Integer)

代码修改自

在cassandra-spark-connector 1.2中,您可以通过以下方式获取TTL和写入时间:

sc.cassandraTable(...).select("column1", WriteTime("column2"), TTL("column3"))

看看this票。

有关用法,请查看集成测试 here