调整包含 explode 和 groupby 的代码的建议

suggestion for tune up the code which contains explode and groupby

我为以下问题编写了代码,但存在以下问题。如果可以做一些调整,请建议我。

  1. 我认为需要更多时间。
  2. 目前有 3 个品牌。它是硬编码的。如果要添加更多品牌,我需要手动添加代码。

输入数据框架构:

root
 |-- id: string (nullable = true)
 |-- attrib: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- pref: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pref_type: string (nullable = true)
 |    |    |-- brand: string (nullable = true)
 |    |    |-- tp_id: string (nullable = true)
 |    |    |-- aff: float (nullable = true)
 |    |    |-- pre_id: string (nullable = true)
 |    |    |-- cr_date: string (nullable = true)
 |    |    |-- up_date: string (nullable = true)
 |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

预期输出架构:

root
 |-- id: string (nullable = true)
 |-- attrib: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- pref: struct (nullable = false)
 |    |-- brandA: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |-- brandB: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |-- brandC: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)

可以根据preferences下的品牌属性进行处理(preferences.brand)

我为此编写了以下代码:

def modifyBrands(inputDf: DataFrame): DataFrame ={
    val PreferenceProps = Array("pref_type", "tp_id", "aff", "pref_id", "cr_date", "up_date", "pref_attrib")
    import org.apache.spark.sql.functions._
    val explodedDf = inputDf.select(col("id"), explode(col("pref")))
        .select(
            col("id"),
            col("col.pref_type"),
            col("col.brand"),
            col("col.tp_id"),
            col("col.aff"),
            col("col.pre_id"),
            col("col.cr_dt"),
            col("col.up_dt"),
            col("col.pref_attrib")
        ).cache()

    val brandAddedDf = explodedDf
        .withColumn("brandA", when(col("brand") === "brandA", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandA"))
        .withColumn("brandB", when(col("brand") === "brandB", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandB"))
        .withColumn("brandC", when(col("brand") === "brandC", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandC"))
        .cache()

    explodedDf.unpersist()

    val groupedDf = brandAddedDf.groupBy("id").agg(
        collect_list("brandA").alias("brandA"),
        collect_list("brandB").alias("brandB"),
        collect_list("brandC").alias("brandC")
    ).withColumn("preferences", struct(
        when(size(col("brandA")).notEqual(0), col("brandA")).alias("brandA"),
        when(size(col("brandB")).notEqual(0), col("brandB")).alias("brandB"),
        when(size(col("brandC")).notEqual(0), col("brandC")).alias("brandC"),
    )).drop("brandA", "brandB", "brandC")
      .cache()
    brandAddedDf.unpersist()

    val idAttributesDf = inputDf.select("id", "attrib").cache()

    val joinedDf = idAttributesDf.join(groupedDf, "id")
    groupedDf.unpersist()
    idAttributesDf.unpersist()
    joinedDf.printSchema()
    joinedDf // returning joined df which will be wrote as paquet file.
  }

我认为它们是关于您如何编写代码的几个问题,但判断您的代码哪里有问题的真正方法是查看 SPARK UI。我发现“作业”选项卡和“SQL”选项卡非常有用,可以找出代码大部分时间花在哪里。然后看看是否可以重写这些部分以提高速度。我在下面指出的一些项目可能无关紧要,如果其他地方确实存在瓶颈,而这确实是大部分时间花费的地方。

创建嵌套结构是有原因的(就像您为 Brand 做的那样)。我只是不确定我在这里看到了回报,而且没有解释。应该考虑为什么要维护这个结构以及有什么好处。维护它有性能提升吗?或者它只是数据创建方式的产物?

可能有点帮助的一般提示:

一般来说,您应该只缓存将多次使用的代码。你有很多代码你不会多次使用但你仍然缓存。

很小很小的性能提升。 (所以换句话说,当你需要每毫秒......) withColumn 实际上没有 select 那样好。 (可能是由于某些对象的创建)在可能的情况下使用 select 而不是 withColumn。除非你真的需要每一毫秒,否则真的不值得重写你的代码。

您可以在数组上使用高阶函数 filter 来简化您的代码。只需通过品牌名称进行映射,并为每个 return 来自 pref 的过滤数组。这样你就避免了爆炸/分组部分。

这是一个完整的例子:

val data = """{"id":1,"attrib":{"key":"k","value":"v"},"pref":[{"pref_type":"type1","brand":"brandA","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandB","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandC","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}}]}"""
val inputDf = spark.read.json(Seq(data).toDS)

val brands = Seq("brandA", "brandB", "brandC")

// or getting them from input dataframe
// val brands = inputDf.select("pref.brand").as[Seq[String]].collect.flatten


val brandAddedDf = inputDf.withColumn(
  "pref",
  struct(brands.map(b => expr(s"filter(pref, x -> x.brand = '$b')").as(b)): _*)
)

brandAddedDf.printSchema
//root
// |-- attrib: struct (nullable = true)
// |    |-- key: string (nullable = true)
// |    |-- value: string (nullable = true)
// |-- id: long (nullable = true)
// |-- pref: struct (nullable = false)
// |    |-- brandA: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandB: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandC: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)