结构化流聚合 return 个错误值

Structured Streaming Aggregations return wrong values

我编写了一个结构化流聚合,它从 Kafka 源获取事件,执行简单计数并将它们写回 Cassandra 数据库。代码如下所示:

val data = stream
  .groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
  .agg(functions.count("*").as("value"))

val query: StreamingQuery = data
  .writeStream
  .queryName("group-by-type")
  .format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
  .outputMode(OutputMode.Complete())
  .option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
  .option("keyspace", "analytics")
  .option("table", "aggregations")
  .option("partitionKeyColumns", "project,type")
  .option("clusteringKeyColumns", "date")
  .start()

问题是每个批次的计数都刚刚结束。所以我会看到 Cassandra 中的计数下降。计数不应在一天内下降,我该如何实现?

编辑: 我也尝试过使用 window 聚合,同样的事情

所以这种情况下的错误实际上不在我的查询或 Spark 中。 为了弄清楚问题出在哪里,我使用了控制台接收器,但那个接收器没有显示问题。

问题出在我的 Cassandra 水槽中,看起来像这样:

class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    data.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
  }
}

它使用 Datastax Spark Cassandra 连接器写入数据帧。

问题是变量 data 包含流数据集。在 Spark 提供的 ConsoleSink 中,数据集在写入之前被复制到静态数据集中。所以我改变了它,现在它可以工作了。完成的版本如下所示:

class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val ds = data.sparkSession.createDataFrame(
      data.sparkSession.sparkContext.parallelize(data.collect()),
      data.schema
    )
    ds.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
  }
}