没有数据时间戳的Spark结构化流聚合(基于触发器的聚合)

Spark structured streaming aggregation without timestamp on data (aggregation based on trigger)

我需要根据 spark 驱动程序时间戳对传入数据执行聚合,不带水印。我的数据没有任何时间戳字段。

要求是:计算每秒接收到的数据的平均值(发送时间无关紧要)

例如,我需要对每个触发器接收到的数据进行聚合,就像之前的 RDD 流一样 API。

有办法吗?

"Trigger by processing time"符合您的要求吗? "Trigger by processing time" 触发每个间隔(由代码定义)。

示例触发代码如下。

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala#L34

您可以创建自己的接收器并在每次调用 addBatch() 时执行您的操作:

class CustomSink extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    data.groupBy().agg(sum("age") as "sumAge").foreach(v => println(s"RESULT=$v"))
  }
}

class CustomSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new PersonSink()
  }

  def shortName(): String = "person"
}

输出模式设置为更新且每 X 秒触发一次

  val query = ds.writeStream
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .outputMode(OutputMode.Update())
    .format("exactlyonce.CustomSinkProvider")