timeseries/tick 数据集的 Spark 转换

Spark transformation for timeseries/tick data set

我们在 hive 中有 table,它将每个交易日结束时的交易订单数据存储为 order_date。其他重要的列是 产品, 合同, 价格(下单价格), ttime(交易时间) 状态(插入、更新或删除) 价格(订单价格)

我们必须从主要 table 以逐笔报价的方式构建图表 table,其中包含从早上开市到那一天的每行(订单)的最大和最小价格订单时间。即对于一个给定的订单,我们将有 4 列填充为 maxPrice(最高价格到现在),maxpriceOrderId(最高价格的 orderid),minPrice 和 minPriceOrderId
这必须针对每个产品、合同,即该产品、合同的最高和最低价格。

在计算这些值时,我们需要排除所有关闭的订单 从聚合。即到目前为止所有订单价格的最大值和最小值,不包括状态为 "Remove"

的订单

我们正在使用:Spark 2.2,输入数据格式为镶木地板。 输入记录

输出记录

给出一个简单的 SQL 视图 - 问题通过自连接解决,看起来像这样: 使用 ttime 上的有序数据集,我们必须获得特定产品的最高和最低价格,从早上到该订单时间的每一行(订单)的合同。这将为每个 eod (order_date) 数据集批量启用 运行:

select mainSet.order_id,    mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.status,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as max_price,
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price desc,aggSet.ttime desc ) as maxOrderId
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as min_price as min_price
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price ,aggSet.ttime) as minOrderId
from order_table mainSet 
join order_table aggSet
ON (mainSet.produuct=aggSet.product,
mainSet.contract=aggSet.contract,
mainSet.ttime>=aggSet.ttime,
aggSet.status <> 'Remove')

在 Spark 中编写

我们从 spark sql 开始,如下所示:

val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")

  val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
        (col("mainSet.product")===col("aggSet.product")
          && col("mainSet.contract")===col("aggSet.contract")
          && col("mainSet.ttime")>= col("aggSet.ttime")
          && col("aggSet.status") <> "Remove")
        ,"inner")
        .select(mainSet.order_id,mainSet.ttime,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.status,aggSet.order_id as agg_orderid,aggSet.ttime as agg_ttime,price as agg_price) //Renaming of columns

  val max_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val min_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val maxPriceCol = max(col("agg_price")).over(max_window)
  val minPriceCol = min(col("agg_price")).over(min_window)
  val firstMaxorder = first_value(col("agg_orderid")).over(max_window.orderBy(col("agg_price").desc, col("agg_ttime").desc))
  val firstMinorder = first_value(col("agg_orderid")).over(min_window.orderBy(col("agg_price"), col("agg_ttime")))      


  val priceDF=  ndf.withColumn("max_price",maxPriceCol)
                    .withColumn("maxOrderId",firstMaxorder)
                    .withColumn("min_price",minPriceCol)
                    .withColumn("minOrderId",firstMinorder)

    priceDF.show(20)

音量统计:

平均 700 万条记录 每个组(产品、合同)的平均计数= 600K

这项工作 运行 持续了几个小时,只是没有 finishing.I 已经尝试增加内存和其他参数,但没有成功。 作业卡住了,很多时候我遇到内存问题 Container killed by YARN for exceeding memory limits. 4.9 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

另一种方法

对我们最低的组列(产品和合同)进行重新分区,然后按时在分区内排序,以便我们按时收到为 mapPartition 函数排序的每一行。

在分区级别维护集合(键为 order_id,价格为值)的同时执行 mappartition,以计算最高和最低价格及其 orderid。

当我们收到订单时,我们将继续从收集中删除状态为 "Remove" 的订单。 一旦在 mapparition 中更新了给定行的集合,我们就可以从集合和 return 更新的行中计算最大值和最小值。

val mainDF: DataFrame= sparkSession.sql("select order_id,product,contract,order_date,price,status,null as maxPrice,null as maxPriceOrderId,null as minPrice,null as minPriceOrderId from order_table where order_date ='eod_date' ").repartitionByRange(col("product"),col("contract"))

case class summary(order_id:String ,ttime:string,product:String,contract :String,order_date:String,price:BigDecimal,status :String,var maxPrice:BigDecimal,var maxPriceOrderId:String ,var minPrice:BigDecimal,var minPriceOrderId String)

val summaryEncoder = Encoders.product[summary]
val priceDF= mainDF.as[summary](summaryEncoder).sortWithinPartitions(col("ttime")).mapPartitions( iter => {
    //collection at partition level
    //key as order_id and value as price
    var priceCollection = Map[String, BigDecimal]()

    iter.map( row => {
        val orderId= row.order_id
        val rowprice= row.price

        priceCollection = row.status match {
                            case "Remove" => if (priceCollection.contains(orderId)) priceCollection -= orderId
                            case _ => priceCollection += (orderId -> rowPrice)
                         }

        row.maxPrice = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._2  // Gives key,value tuple from collectin for  max value )
        row.maxPriceOrderId = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._1

        row.minPrice =  if(priceCollection.size > 0) priceCollection.minBy(_._2)._2   // Gives key,value tuple from collectin for  min value )
        row.minPriceOrderId = if(priceCollection.size > 0) priceCollection.minBy(_._2)._1

      row

    })
  }).show(20)

对于较小的数据集,运行宁可在 20 分钟内完成,但我发现对于 23 条工厂记录(有 17 条差异产品和合同),结果似乎不正确。我可以看到来自 mappartition 的一个分区(输入拆分)的数据正在进入另一个分区,从而弄乱了值。

--> 我们能否实现一种情况,在这种情况下,我可以保证每个 mappartition 任务都会在此处获取功能键(产品和合同)的所有数据。。 据我所知,mappartition 在每个 spark 分区上执行函数(类似于 map reduce 中的输入拆分),因此我如何强制 spark 创建具有该产品和合同组的所有值的 inputsplits/partitions。

--> 有没有其他方法可以解决这个问题

非常感谢您的帮助,因为我们被困在这里。

您用于重新分区数据的方法repartitionByRange 对这些列表达式上的数据进行分区,但进行范围分区。您想要的是对这些列进行散列分区。

将方法更改为 repartition 并将这些列传递给它,它应该确保相同的值组最终出现在一个分区中。

编辑:这里是 article 为什么很多小文件都是坏的

Why is poorly compacted data bad? Poorly compacted data is bad for Spark applications in the sense that it is extremely slow to process. Continuing with our previous example, anytime we want to process a day’s worth of events we have to open up 86,400 files to get to the data. This slows down processing massively because our Spark application is effectively spending most of its time just opening and closing files. What we normally want is for our Spark application to spend most of its time actually processing the data. We’ll do some experiments next to show the difference in performance when using properly compacted data as compared to poorly compacted data.


我敢打赌,如果您正确地将源数据分区到您要加入的任何地方并摆脱所有这些 windows,您最终会处于一个更好的位置。

每次您点击 partitionBy 时,您都在强制洗牌,而每次您点击 orderBy 时,您都在强制进行昂贵的排序。

我建议您查看数据集 API 并学习一些 groupBy 和 flatMapGroups/reduce/sliding 用于 O(n) 时间计算。您可以一次性获得 min/max。

此外,听起来您的驱动程序 运行由于许多小文件问题而导致内存不足。尝试尽可能地压缩源数据并对表进行适当分区。在这种特殊情况下,我建议按 order_date(也许每天?)进行分区,然后对产品和合同进行子分区。

这是我花了大约 30 分钟写的一个片段,可能 运行 比您的窗口函数好得多。它应该在 O(n) 时间内 运行 但如果你有很多小文件问题,它不能弥补。如果有任何遗漏,请告诉我。

import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.mutable

case class Summary(
  order_id: String,
  ttime: String,
  product: String,
  contract: String,
  order_date: String,
  price: BigDecimal,
  status: String,
  maxPrice: BigDecimal = 0,
  maxPriceOrderId: String = null,
  minPrice: BigDecimal = 0,
  minPriceOrderId: String = null
)

class Workflow()(implicit spark: SparkSession) {

  import MinMaxer.summaryEncoder

  val mainDs: Dataset[Summary] =
    spark.sql(
      """
        select order_id, ttime, product, contract, order_date, price, status
        from order_table where order_date ='eod_date'
      """
    ).as[Summary]

  MinMaxer.minMaxDataset(mainDs)
}

object MinMaxer {

  implicit val summaryEncoder: Encoder[Summary] = Encoders.product[Summary]
  implicit val groupEncoder: Encoder[(String, String)] = Encoders.product[(String, String)]

  object SummaryOrderer extends Ordering[Summary] {
    def compare(x: Summary, y: Summary): Int = x.ttime.compareTo(y.ttime)
  }

  def minMaxDataset(ds: Dataset[Summary]): Dataset[Summary] = {
    ds
      .groupByKey(x => (x.product, x.contract))
      .flatMapGroups({ case (_, t) =>
        val sortedRecords: Seq[Summary] = t.toSeq.sorted(SummaryOrderer)

        generateMinMax(sortedRecords)
      })
  }

  def generateMinMax(summaries: Seq[Summary]): Seq[Summary] = {
    summaries.foldLeft(mutable.ListBuffer[Summary]())({case (b, summary) =>

      if (b.lastOption.nonEmpty) {
        val lastSummary: Summary = b.last

        var minPrice: BigDecimal = 0
        var minPriceOrderId: String = null
        var maxPrice: BigDecimal = 0
        var maxPriceOrderId: String = null

        if (summary.status != "remove") {
          if (lastSummary.minPrice >= summary.price) {
            minPrice = summary.price
            minPriceOrderId = summary.order_id
          } else {
            minPrice = lastSummary.minPrice
            minPriceOrderId = lastSummary.minPriceOrderId
          }

          if (lastSummary.maxPrice <= summary.price) {
            maxPrice = summary.price
            maxPriceOrderId = summary.order_id
          } else {
            maxPrice = lastSummary.maxPrice
            maxPriceOrderId = lastSummary.maxPriceOrderId
          }

          b.append(
            summary.copy(
              maxPrice = maxPrice,
              maxPriceOrderId = maxPriceOrderId,
              minPrice = minPrice,
              minPriceOrderId = minPriceOrderId
            )
          )
        } else {
          b.append(
            summary.copy(
              maxPrice = lastSummary.maxPrice,
              maxPriceOrderId = lastSummary.maxPriceOrderId,
              minPrice = lastSummary.minPrice,
              minPriceOrderId = lastSummary.minPriceOrderId
            )
          )
        }
      } else {
        b.append(
          summary.copy(
            maxPrice = summary.price,
            maxPriceOrderId = summary.order_id,
            minPrice = summary.price,
            minPriceOrderId = summary.order_id
          )
        )
      }

      b
    })
  }
}