没有数据时间戳的Spark结构化流聚合(基于触发器的聚合)
Spark structured streaming aggregation without timestamp on data (aggregation based on trigger)
我需要根据 spark 驱动程序时间戳对传入数据执行聚合,不带水印。我的数据没有任何时间戳字段。
要求是:计算每秒接收到的数据的平均值(发送时间无关紧要)
例如,我需要对每个触发器接收到的数据进行聚合,就像之前的 RDD 流一样 API。
有办法吗?
"Trigger by processing time"符合您的要求吗?
"Trigger by processing time" 触发每个间隔(由代码定义)。
示例触发代码如下。
您可以创建自己的接收器并在每次调用 addBatch() 时执行您的操作:
class CustomSink extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.groupBy().agg(sum("age") as "sumAge").foreach(v => println(s"RESULT=$v"))
}
}
class CustomSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new PersonSink()
}
def shortName(): String = "person"
}
输出模式设置为更新且每 X 秒触发一次
val query = ds.writeStream
.trigger(Trigger.ProcessingTime("1 seconds"))
.outputMode(OutputMode.Update())
.format("exactlyonce.CustomSinkProvider")
我需要根据 spark 驱动程序时间戳对传入数据执行聚合,不带水印。我的数据没有任何时间戳字段。
要求是:计算每秒接收到的数据的平均值(发送时间无关紧要)
例如,我需要对每个触发器接收到的数据进行聚合,就像之前的 RDD 流一样 API。
有办法吗?
"Trigger by processing time"符合您的要求吗? "Trigger by processing time" 触发每个间隔(由代码定义)。
示例触发代码如下。
您可以创建自己的接收器并在每次调用 addBatch() 时执行您的操作:
class CustomSink extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.groupBy().agg(sum("age") as "sumAge").foreach(v => println(s"RESULT=$v"))
}
}
class CustomSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new PersonSink()
}
def shortName(): String = "person"
}
输出模式设置为更新且每 X 秒触发一次
val query = ds.writeStream
.trigger(Trigger.ProcessingTime("1 seconds"))
.outputMode(OutputMode.Update())
.format("exactlyonce.CustomSinkProvider")