将行拆分为多行以限制列中数组的长度(spark / scala)
Split row into multiple rows to limit length of array in column (spark / scala)
我有一个如下所示的数据框:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, .... x, y, z]|
+--------------+--------------------+
| 1|[q, z, .... x, b, 5]|
+--------------+--------------------+
| 2|[q, z, .... x, b, 5]|
+--------------+--------------------+
我想拆分行,使 items
列中的数组的长度最多为 20。如果数组的长度大于 20,我想创建新行并将数组拆分以便每个数组的长度为 20 或更短。因此,对于我的示例数据框中的第一行,如果我们假设长度为 10 并且我希望每行的长度最多为 3,我希望它像这样拆分:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, c] |
+--------------+--------------------+
| 1|[z, y, z] |
+--------------+--------------------+
| 1|[e, f, g] |
+--------------+--------------------+
| 1|[q] |
+--------------+--------------------+
理想情况下,如果数组的长度不能被所需的最大长度整除,除最后一行外所有行的长度都应为 3。注意 - id
列不是唯一的
你可以试试这个:
import pandas as pd
max_item_length = 3
df = pd.DataFrame(
{"fake_index": [1, 2, 3],
"items": [["a", "b", "c", "d", "e"], ["f", "g", "h", "i", "j"], ["k", "l"]]}
)
df2 = pd.DataFrame({"fake_index": [], "items": []})
for i in df.index:
try:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][:max_item_length]},
ignore_index=True)
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][max_item_length:]},
ignore_index=True)
except:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1]}, ignore_index=True)
df = df2
print(df)
输入:
fake_index items
0 1 [a, b, c, d, e]
1 2 [f, g, h, i, j]
2 3 [k, l]
输出:
fake_index items
0 1 [a, b, c]
1 1 [d, e]
2 2 [f, g, h]
3 2 [i, j]
4 3 [k, l]
使用 higher-order 函数 transform
+ filter
和 slice
,您可以将数组拆分为大小为 20 的子数组,然后展开它:
val l = 20
val df1 = df.withColumn(
"items",
explode(
expr(
s"filter(transform(items, (x,i)-> IF(i%$l=0, slice(items,i+1,$l), null)), x-> x is not null)"
)
)
)
由于这需要更复杂的转换,因此我使用了数据集。这可能没有那么高效,但它会得到你想要的。
设置
正在创建一些样本数据来模仿您的数据。
val arrayData = Seq(
Row(1,List(1, 2, 3, 4, 5, 6, 7)),
Row(2,List(1, 2, 3, 4)),
Row(3,List(1, 2)),
Row(4,List(1, 2, 3))
)
val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
+---+---------------------+
|id |values |
+---+---------------------+
|1 |[1, 2, 3, 4, 5, 6, 7]|
|2 |[1, 2, 3, 4] |
|3 |[1, 2] |
|4 |[1, 2, 3] |
+---+---------------------+
*/
变换
// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]
// Here we are using a sliding window of size 3 and step 3.
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
val id = r.getInt(0)
val a = r.getSeq[Int](1).toArray
val arrays = a.sliding(3, 3).toArray
(id, arrays)
})
/*
+---+---------------------------------------------------------------+
|_1 |_2 |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
val df3 = df2
.withColumnRenamed("_1", "id")
.withColumnRenamed("_2", "values")
/*
+---+---------------------------------------------------------------+
|id |values |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
使用爆炸
Expode 将为第二列中的每个数组条目创建一个新元素。
val df4 = df3.withColumn("values", functions.explode($"values"))
/*
+---+---------+
|id |values |
+---+---------+
|1 |[1, 2, 3]|
|1 |[4, 5, 6]|
|1 |[7] |
|2 |[1, 2, 3]|
|2 |[4] |
|3 |[1, 2] |
|4 |[1, 2, 3]|
+---+---------+
*/
限制
这种方法并非没有局限性。
主要是,它在较大的数据集上性能不佳,因为此代码不再使用数据帧 built-in 优化。但是,数据框 API 可能需要使用 window 函数,根据数据的大小,这些函数的性能也会受到限制。如果可以在源头更改此数据,建议这样做。
这种方法还需要为更复杂的事物定义编码器。如果数据模式发生变化,则必须使用不同的编码器。
我有一个如下所示的数据框:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, .... x, y, z]|
+--------------+--------------------+
| 1|[q, z, .... x, b, 5]|
+--------------+--------------------+
| 2|[q, z, .... x, b, 5]|
+--------------+--------------------+
我想拆分行,使 items
列中的数组的长度最多为 20。如果数组的长度大于 20,我想创建新行并将数组拆分以便每个数组的长度为 20 或更短。因此,对于我的示例数据框中的第一行,如果我们假设长度为 10 并且我希望每行的长度最多为 3,我希望它像这样拆分:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, c] |
+--------------+--------------------+
| 1|[z, y, z] |
+--------------+--------------------+
| 1|[e, f, g] |
+--------------+--------------------+
| 1|[q] |
+--------------+--------------------+
理想情况下,如果数组的长度不能被所需的最大长度整除,除最后一行外所有行的长度都应为 3。注意 - id
列不是唯一的
你可以试试这个:
import pandas as pd
max_item_length = 3
df = pd.DataFrame(
{"fake_index": [1, 2, 3],
"items": [["a", "b", "c", "d", "e"], ["f", "g", "h", "i", "j"], ["k", "l"]]}
)
df2 = pd.DataFrame({"fake_index": [], "items": []})
for i in df.index:
try:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][:max_item_length]},
ignore_index=True)
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][max_item_length:]},
ignore_index=True)
except:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1]}, ignore_index=True)
df = df2
print(df)
输入:
fake_index items
0 1 [a, b, c, d, e]
1 2 [f, g, h, i, j]
2 3 [k, l]
输出:
fake_index items
0 1 [a, b, c]
1 1 [d, e]
2 2 [f, g, h]
3 2 [i, j]
4 3 [k, l]
使用 higher-order 函数 transform
+ filter
和 slice
,您可以将数组拆分为大小为 20 的子数组,然后展开它:
val l = 20
val df1 = df.withColumn(
"items",
explode(
expr(
s"filter(transform(items, (x,i)-> IF(i%$l=0, slice(items,i+1,$l), null)), x-> x is not null)"
)
)
)
由于这需要更复杂的转换,因此我使用了数据集。这可能没有那么高效,但它会得到你想要的。
设置
正在创建一些样本数据来模仿您的数据。
val arrayData = Seq(
Row(1,List(1, 2, 3, 4, 5, 6, 7)),
Row(2,List(1, 2, 3, 4)),
Row(3,List(1, 2)),
Row(4,List(1, 2, 3))
)
val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
+---+---------------------+
|id |values |
+---+---------------------+
|1 |[1, 2, 3, 4, 5, 6, 7]|
|2 |[1, 2, 3, 4] |
|3 |[1, 2] |
|4 |[1, 2, 3] |
+---+---------------------+
*/
变换
// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]
// Here we are using a sliding window of size 3 and step 3.
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
val id = r.getInt(0)
val a = r.getSeq[Int](1).toArray
val arrays = a.sliding(3, 3).toArray
(id, arrays)
})
/*
+---+---------------------------------------------------------------+
|_1 |_2 |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
val df3 = df2
.withColumnRenamed("_1", "id")
.withColumnRenamed("_2", "values")
/*
+---+---------------------------------------------------------------+
|id |values |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
使用爆炸
Expode 将为第二列中的每个数组条目创建一个新元素。
val df4 = df3.withColumn("values", functions.explode($"values"))
/*
+---+---------+
|id |values |
+---+---------+
|1 |[1, 2, 3]|
|1 |[4, 5, 6]|
|1 |[7] |
|2 |[1, 2, 3]|
|2 |[4] |
|3 |[1, 2] |
|4 |[1, 2, 3]|
+---+---------+
*/
限制
这种方法并非没有局限性。
主要是,它在较大的数据集上性能不佳,因为此代码不再使用数据帧 built-in 优化。但是,数据框 API 可能需要使用 window 函数,根据数据的大小,这些函数的性能也会受到限制。如果可以在源头更改此数据,建议这样做。
这种方法还需要为更复杂的事物定义编码器。如果数据模式发生变化,则必须使用不同的编码器。