Spark UDF 使用 Cassandra 连接器查找密钥

Spark UDF To Look up Keys Using Cassandra Connector

我根据下面这个问题的回答得到了一些信息。 withSessionDo 重用每个节点上可用的底层 JVM 级会话 Spark Cassandra Connector proper usage

val connector = CassandraConnector(sparkConf) // I Know this is serializable.

def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
        val result = session.execute( stmt.bind(key) )
        MyCaseClass(
           fieldl1 = result.getString(0),
           fieldl2 = result.getInt(1)
           ...
        )
    }
})

会话不可序列化,因此我们无法在 udf 外部创建会话并将其传入,因此我们可以使用映射管理器将行转换为大小写 class 实例。 使用映射管理器的替代方法,

def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val manager = new MappingManager(session)   // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
        val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
        mapperClass.get(key)
    }
})

我是 cassandra 的新手,所以请耐心回答我的几个问题。

  1. 这些方法中是否有我不知道的问题?
  2. 在第二种方法中,我知道我们在每次调用 UDF 时都会创建一个新的 MappingManager(session)。这是否仍会使用 jvm 级会话并打开更多会话? 每次调用都实例化 MappingManager 是否正确?该会话不可序列化,因此我无法在外部创建它并将其传递给 UDF。
  3. 将结果行转换为案例对象的其他方法有哪些Class?
  4. 有没有更好的替代方法来进行这种查找?

您正在尝试模拟 Spark Cassandra Connector (SCC) 在幕后所做的事情,但您的实施会比 SCC 慢得多,因为您使用的是同步 API,并且所有数据都是一个又一个,SCC使用异步API,并行拉取多行数据

实现您想要的最好方法是使用 Cassandra-optimized 连接(通常称为“直接连接”)。这种联接一直适用于 RDD API,但长期以来仅在商业版本的连接器中适用于 Dataframe API。但是从 SCC 2.5.0 开始(released in May 2020th), this functionality is also available in open source version, so you can use it instead of building its emulation. The direct join is performed only when you enable special Catalyst extensions,通过在配置 SparkSession 时传递 spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions(例如通过 command-line)。之后,您可以执行与 Cassandra table 的连接完整或部分主键,SCC 会自动将连接转换为对 Cassandra 的单独请求,这些请求执行得非常有效。您可以通过在连接的数据帧上执行 explain 来检查是否发生这种情况,因此您应该看到类似这样的内容(看对于字符串 Cassandra Direct Join):

scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
   +- *(1) Range (1, 5, step=1, splits=8)

我最近 wrote a long blog post 解释了如何使用 Dataframe 和 RDD APIs 在 Cassandra 中执行有效的数据连接 APIs - 我不想在这里重复它:-)