将行拆分为多行以限制列中数组的长度(spark / scala)

Split row into multiple rows to limit length of array in column (spark / scala)

我有一个如下所示的数据框:

+--------------+--------------------+
|id            |           items    |
+--------------+--------------------+
|             1|[a, b, .... x, y, z]|
+--------------+--------------------+
|             1|[q, z, .... x, b, 5]|
+--------------+--------------------+
|             2|[q, z, .... x, b, 5]|
+--------------+--------------------+

我想拆分行,使 items 列中的数组的长度最多为 20。如果数组的长度大于 20,我想创建新行并将数组拆分以便每个数组的长度为 20 或更短。因此,对于我的示例数据框中的第一行,如果我们假设长度为 10 并且我希望每行的长度最多为 3,我希望它像这样拆分:

+--------------+--------------------+
|id            |           items    |
+--------------+--------------------+
|             1|[a, b, c]           |
+--------------+--------------------+
|             1|[z, y, z]           |
+--------------+--------------------+
|             1|[e, f, g]           |
+--------------+--------------------+
|             1|[q]                 |
+--------------+--------------------+

理想情况下,如果数组的长度不能被所需的最大长度整除,除最后一行外所有行的长度都应为 3。注意 - id 列不是唯一的

你可以试试这个:

import pandas as pd

max_item_length = 3

df = pd.DataFrame(
    {"fake_index": [1, 2, 3],
     "items": [["a", "b", "c", "d", "e"], ["f", "g", "h", "i", "j"], ["k", "l"]]}
)

df2 = pd.DataFrame({"fake_index": [], "items": []})

for i in df.index:
    try:
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][:max_item_length]},
                         ignore_index=True)
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][max_item_length:]},
                         ignore_index=True)
    except:
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1]}, ignore_index=True)

df = df2

print(df)

输入:

   fake_index            items
0           1  [a, b, c, d, e]
1           2  [f, g, h, i, j]
2           3           [k, l]

输出:

   fake_index      items
0           1  [a, b, c]
1           1     [d, e]
2           2  [f, g, h]
3           2     [i, j]
4           3     [k, l]

使用 higher-order 函数 transform + filterslice,您可以将数组拆分为大小为 20 的子数组,然后展开它:

val l = 20

val df1 = df.withColumn(
  "items",
  explode(
    expr(
      s"filter(transform(items, (x,i)-> IF(i%$l=0, slice(items,i+1,$l), null)), x-> x is not null)"
    )
  )
)

由于这需要更复杂的转换,因此我使用了数据集。这可能没有那么高效,但它会得到你想要的。

设置

正在创建一些样本数据来模仿您的数据。

val arrayData = Seq(
  Row(1,List(1, 2, 3, 4, 5, 6, 7)),
  Row(2,List(1, 2, 3, 4)),
  Row(3,List(1, 2)),
  Row(4,List(1, 2, 3))
)

val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))

val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
 +---+---------------------+
 |id |values               |
 +---+---------------------+
 |1  |[1, 2, 3, 4, 5, 6, 7]|
 |2  |[1, 2, 3, 4]         |
 |3  |[1, 2]               |
 |4  |[1, 2, 3]            |
 +---+---------------------+
*/


变换

// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]

// Here we are using a sliding window of size 3 and step 3. 
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
  val id = r.getInt(0)
  val a = r.getSeq[Int](1).toArray
  val arrays = a.sliding(3, 3).toArray
  (id, arrays)
})
/*
 +---+---------------------------------------------------------------+
 |_1 |_2                                                             |
 +---+---------------------------------------------------------------+
 |1  |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
 |2  |[WrappedArray(1, 2, 3), WrappedArray(4)]                       |
 |3  |[WrappedArray(1, 2)]                                           |
 |4  |[WrappedArray(1, 2, 3)]                                        |
 +---+---------------------------------------------------------------+
*/

val df3 = df2
  .withColumnRenamed("_1", "id")
  .withColumnRenamed("_2", "values")
/*
 +---+---------------------------------------------------------------+
 |id |values                                                         |
 +---+---------------------------------------------------------------+
 |1  |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
 |2  |[WrappedArray(1, 2, 3), WrappedArray(4)]                       |
 |3  |[WrappedArray(1, 2)]                                           |
 |4  |[WrappedArray(1, 2, 3)]                                        |
 +---+---------------------------------------------------------------+
*/

使用爆炸

Expode 将为第二列中的每个数组条目创建一个新元素。

val df4 = df3.withColumn("values", functions.explode($"values"))
/*
 +---+---------+
 |id |values   |
 +---+---------+
 |1  |[1, 2, 3]|
 |1  |[4, 5, 6]|
 |1  |[7]      |
 |2  |[1, 2, 3]|
 |2  |[4]      |
 |3  |[1, 2]   |
 |4  |[1, 2, 3]|
 +---+---------+
*/


限制

这种方法并非没有局限性。

主要是,它在较大的数据集上性能不佳,因为此代码不再使用数据帧 built-in 优化。但是,数据框 API 可能需要使用 window 函数,根据数据的大小,这些函数的性能也会受到限制。如果可以在源头更改此数据,建议这样做。

这种方法还需要为更复杂的事物定义编码器。如果数据模式发生变化,则必须使用不同的编码器。