如何将数据框中的数据存储在变量中以用作 cassandra select 中的参数?

How to store data from a dataframe in a variable to use as a parameter in a select in cassandra?

我有一个 Spark Structured Streaming 应用程序。应用程序从 kafka 接收数据,并应该使用这些值作为参数来处理来自 cassandra 数据库的数据。我的问题是如何使用输入数据帧(kafka)中的数据作为cassandra“select”中的“where”参数而不出现以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

这是我的 df 输入:

 val df = spark
  .readStream
  .format("kafka")
  .options(
    Map("kafka.bootstrap.servers"-> kafka_bootstrap,
      "subscribe" -> kafka_topic,
      "startingOffsets"-> "latest",
      "fetchOffset.numRetries"-> "5",
      "kafka.group.id"-> groupId
    ))
  .load()

每当我尝试将数据帧值存储在变量中以用作参数时,我都会收到此错误。

这是我创建的尝试将数据转换为变量的方法。随之而来的火花给出了我之前提到的错误:

def processData(messageToProcess: DataFrame): DataFrame = {

val messageDS: Dataset[Message] = messageToProcess.as[Message]

val listData: Array[Message] = messageDS.collect()

listData.foreach(x => println(x.country))

val mensagem = messageToProcess

mensagem

}

如错误消息所示,您必须使用 writeStream.start() 才能执行结构化流查询。 您不能在流数据帧上使用与批处理数据帧相同的操作(如 .collect().show().count()),请参阅 Spark Structured Streaming 文档的 Unsupported Operations section .

在您的情况下,您正尝试在流式数据集上使用 messageDS.collect(),这是不允许的。为实现此目标,您可以使用 foreachBatch 输出接收器来收集每个微批次所需的行:

streamingDF.writeStream.foreachBatch { (microBatchDf: DataFrame, batchId: Long) =>
    // Now microBatchDf is no longer a streaming dataframe
    // you can check with microBatchDf.isStreaming
    
    val messageDS: Dataset[Message] = microBatchDf.as[Message]

    val listData: Array[Message] = messageDS.collect()

    listData.foreach(x => println(x.country))
    
    // ...
}

当你需要使用Kafka中的数据来查询Cassandra中的数据时,这种操作就是典型的两个数据集之间的连接——你不需要调用.collect来查找条目,你只需要执行加入。这是非常典型的事情——用来自外部数据集的数据丰富 Kafka 中的数据,而 Cassandra 提供低延迟操作。

您的代码可能如下所示(您需要配置所谓的 DirectJoin,请参阅下面的 link):

import spark.implicits._
import org.apache.spark.sql.cassandra._

val df = spark.readStream.format("kafka")
  .options(Map(...)).load()
... decode data in Kafka into columns
val cassdata = spark.read.cassandraFormat("table", "keyspace").load
val joined = df.join(cassdata, cassdata("pk") === df("some_column"))
val processed = ... process joined data

val query = processed.writeStream.....output data somewhere...start()
query.awaitTermination()

detailed blog post了解如何在 Cassandra 中执行高效的数据连接。