如何有效地乘以 PySpark 数据框中的行?
How to efficiently multiply rows in PySpark dataframe?
我正在尝试通过使用现有的小型数据集并使其变得更大来制作合成数据集。我希望目标大小为 20M 行。
我目前的方法:
for i in range(int(log(130000, 2))):
table_copy = table_copy.unionAll(table_copy)
但是在第 12 次迭代(共 17 次)之后这会减慢很多。有没有更快的方法将由 150 行组成的数据帧变成 20M?
如果我没理解错的话。您想要扩展或扩大 具有相同数据的数据集:
val replicas = 5 // calcu yourself and i've never try 20M
val dsReplicated = ds.flatMap(a => 0 until replicas map ((a, _))).map(_._1)
或数据框:
val dfReplicated = df
.withColumn("__temporarily__", functions.typedLit((0 until replicas).toArray))
.withColumn("idx", functions.explode($"__temporarily__"))
.drop($"__temporarily__")
.drop($"idx")
这个效果最好:
(5 秒 = 2000 万行)
df = spark.range(150)
factor = 135000
df = df.withColumn('a', F.expr(f'explode(array_repeat(0,{factor}))')).drop('a')
提出的想法
你的情况可能只是:
table_copy = table_copy.withColumn('a', F.expr('explode(array_repeat(0,135000))')).drop('a')
其他测试选项
(16 秒 = 150 万行)
import pyspark.sql.functions as F
df = spark.range(150)
df = df.withColumn('array', F.explode(F.array(*map(F.lit, range(1000)))))
df = df.drop('array')
(11 秒 = 38k 行):
def union_self(df, p):
if p:
df = union_self(df, p - 1)
return df.union(df)
return df
df = spark.range(150)
df = union_self(df, 8)
(16 秒 = 38k 行):
from functools import reduce
df = spark.range(150)
df = reduce(lambda df1, df2: df1.union(df2), [df] * 256)
我正在尝试通过使用现有的小型数据集并使其变得更大来制作合成数据集。我希望目标大小为 20M 行。 我目前的方法:
for i in range(int(log(130000, 2))):
table_copy = table_copy.unionAll(table_copy)
但是在第 12 次迭代(共 17 次)之后这会减慢很多。有没有更快的方法将由 150 行组成的数据帧变成 20M?
如果我没理解错的话。您想要扩展或扩大 具有相同数据的数据集:
val replicas = 5 // calcu yourself and i've never try 20M
val dsReplicated = ds.flatMap(a => 0 until replicas map ((a, _))).map(_._1)
或数据框:
val dfReplicated = df
.withColumn("__temporarily__", functions.typedLit((0 until replicas).toArray))
.withColumn("idx", functions.explode($"__temporarily__"))
.drop($"__temporarily__")
.drop($"idx")
这个效果最好:
(5 秒 = 2000 万行)
df = spark.range(150)
factor = 135000
df = df.withColumn('a', F.expr(f'explode(array_repeat(0,{factor}))')).drop('a')
提出的想法
你的情况可能只是:
table_copy = table_copy.withColumn('a', F.expr('explode(array_repeat(0,135000))')).drop('a')
其他测试选项
(16 秒 = 150 万行)
import pyspark.sql.functions as F
df = spark.range(150)
df = df.withColumn('array', F.explode(F.array(*map(F.lit, range(1000)))))
df = df.drop('array')
(11 秒 = 38k 行):
def union_self(df, p):
if p:
df = union_self(df, p - 1)
return df.union(df)
return df
df = spark.range(150)
df = union_self(df, 8)
(16 秒 = 38k 行):
from functools import reduce
df = spark.range(150)
df = reduce(lambda df1, df2: df1.union(df2), [df] * 256)