如何在我的 spark 结构化流媒体中获得两个不同的 cassandra 集群?

How can I get two different cassandra clusters in my spark structured streaming?

我有两个 cassandra 集群。如何在同一 SparkSession 中同时设置主机、密码和用户?我如何将它们与 CassandraConnector 一起使用?

我试过这个:

val cassandraCon: CassandraConnector = CassandraConnector(conf)

  val ks = "monitore"
  val ttableName = "validate_structure"

  def getIndex(): ResultSet = {

    val table = ks + "." + ttableName
    val query = s"""select *
                   |from ${table}""".stripMargin
    println(query)
    cassandraCon.withSessionDo(s => {
      s.execute(query)
    })
  }

但是,问题是这仅在 cassandra 集群与 spark 位于同一主机上时才有效。我也尝试创建一个目录,但我找不到用 session.execute 而不是 spark.sql 来发出请求的方法。

有人可以帮助我吗?我使用 Spark Structured Streaming 3.1.2 并使用 cassandra 连接来丰富我的数据。

第一个问题是 .withSessionDo 是 运行 仅在驱动程序的上下文中,而不是在执行程序的上下文中,因此不会分发。

您需要使用:

  • 来自RDD API的.joinWithCassandraTable function(也有它的左连接版本)
  • 或使用所谓的 DirectJoin (see details in the blog post of its author), when Spark Cassandra Connector (SCC) detects that one side of the join is in Cassandra, and convert it into queries to individual partitions. Unfortunately, Spark 3.1 broke the current version of DirectJoin in SCC (see JIRA),因此您可能需要使用 RDD API 直到它被修复。

detailed blog post了解如何在 Cassandra 中执行高效的数据连接。

关于这两个集群 - 完全可以为单个 read/write 操作指定连接详细信息,只需指定 .option("spark.cassandra.connection.host", "host-or-ip")。 (SCC 的主要开发者 Russell Spitzer blog posts on how to connect to multiple clusters). It also possible to do when you're using Catalog API - just append connection property name, like, spark.cassandra.connection.host to the specific catalog name (see docs