Spark Streaming 数据集 Cassandra 连接 UnsupportedOperationChecker

Spark Streaming Dataset Cassandra Connection UnsupportedOperationChecker

我正在尝试将我的流数据集写入 Cassandra。

我有以下 class 的流数据集;

case class UserSession(var id: Int,
                       var visited: List[String]
                      )

我在 Cassandra 中也有以下 keyspace/table。 (博客=KeySpace,会话=Table

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy',    'replication_factor' : 1 };


CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);

我选择了 list<text> 作为 visited 因为我的 visited 是类型 List<String>

我的foreach编写器如下

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch, on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false, skip this chunk
      - for each entry in this chunk, call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long, epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

查看我的流程函数可能会有所帮助,因为这可能会引发错误。我的主要是以下。

finishedUserSessionsStream: 数据集[UserSession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation", "checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

这给了我以下错误

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)

对于 Spark 3.0 和 Spark Cassandra Connector 3.0.0,您不应该使用 foreach - 这是 SCC < 2.5.0 的解决方法,它没有对编写流数据集的本机支持。 Starting with SCC 2.5.0, you can just directly write data to Cassandra, like this (here is full example):

     val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation", "checkpoint")
      .option("keyspace", "ks")
      .option("table", "table")
      .start()

您还需要切换到使用包含大量修复的 SCC 3.0.0-beta。