结构化流聚合 return 个错误值
Structured Streaming Aggregations return wrong values
我编写了一个结构化流聚合,它从 Kafka 源获取事件,执行简单计数并将它们写回 Cassandra 数据库。代码如下所示:
val data = stream
.groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
.agg(functions.count("*").as("value"))
val query: StreamingQuery = data
.writeStream
.queryName("group-by-type")
.format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
.outputMode(OutputMode.Complete())
.option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
.option("keyspace", "analytics")
.option("table", "aggregations")
.option("partitionKeyColumns", "project,type")
.option("clusteringKeyColumns", "date")
.start()
问题是每个批次的计数都刚刚结束。所以我会看到 Cassandra 中的计数下降。计数不应在一天内下降,我该如何实现?
编辑:
我也尝试过使用 window 聚合,同样的事情
所以这种情况下的错误实际上不在我的查询或 Spark 中。
为了弄清楚问题出在哪里,我使用了控制台接收器,但那个接收器没有显示问题。
问题出在我的 Cassandra 水槽中,看起来像这样:
class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
}
}
它使用 Datastax Spark Cassandra 连接器写入数据帧。
问题是变量 data
包含流数据集。在 Spark 提供的 ConsoleSink 中,数据集在写入之前被复制到静态数据集中。所以我改变了它,现在它可以工作了。完成的版本如下所示:
class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val ds = data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()),
data.schema
)
ds.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
}
}
我编写了一个结构化流聚合,它从 Kafka 源获取事件,执行简单计数并将它们写回 Cassandra 数据库。代码如下所示:
val data = stream
.groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
.agg(functions.count("*").as("value"))
val query: StreamingQuery = data
.writeStream
.queryName("group-by-type")
.format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
.outputMode(OutputMode.Complete())
.option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
.option("keyspace", "analytics")
.option("table", "aggregations")
.option("partitionKeyColumns", "project,type")
.option("clusteringKeyColumns", "date")
.start()
问题是每个批次的计数都刚刚结束。所以我会看到 Cassandra 中的计数下降。计数不应在一天内下降,我该如何实现?
编辑: 我也尝试过使用 window 聚合,同样的事情
所以这种情况下的错误实际上不在我的查询或 Spark 中。 为了弄清楚问题出在哪里,我使用了控制台接收器,但那个接收器没有显示问题。
问题出在我的 Cassandra 水槽中,看起来像这样:
class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
}
}
它使用 Datastax Spark Cassandra 连接器写入数据帧。
问题是变量 data
包含流数据集。在 Spark 提供的 ConsoleSink 中,数据集在写入之前被复制到静态数据集中。所以我改变了它,现在它可以工作了。完成的版本如下所示:
class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val ds = data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()),
data.schema
)
ds.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
}
}