在 pyspark 中高效地以分布式方式生成大型 DataFrame(没有 pyspark.sql.Row)
Generating large DataFrame in a distributed way in pyspark efficiently (without pyspark.sql.Row)
问题归结为以下几点:我想使用现有的并行化输入集合在 pyspark 中生成一个数据帧,并且一个函数给定一个输入可以生成相对较大的行。在下面的示例中,我想使用例如生成 10^12 行数据框1000 名执行者:
def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
StructField("seed", IntegerType()),
StructField("n", IntegerType()),
StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)
(我真的不想研究给定种子的随机数分布——这只是我想出的一个例子来说明大数据帧不是从仓库加载,而是由代码)
上面的代码几乎完全符合我的要求。问题是它以一种非常低效的方式实现——以为每一行创建一个 python Row 对象为代价,然后将 python Row 对象转换为内部 Spark 柱状表示。
有没有一种方法可以通过让 spark 知道这些是一批值的列来转换已经在柱状表示中的一批行(例如上面的一个或几个 numpy 数组 np_array
) ?
例如我可以编写代码来生成 python 集合 RDD,其中每个元素都是 pyarrow.RecordBatch 或 pandas.DataFrame,但我无法找到一种方法将其中任何一个转换为 Spark DataFrame 而不创建进程中 pyspark Row 对象的 RDD。
至少有十几篇文章举例说明了如何使用 pyarrow + pandas 将本地(到驱动程序)pandas 数据帧有效地转换为 Spark 数据帧,但这不是一个我的选择,因为我需要在执行程序上以分布式方式实际生成数据,而不是在驱动程序上生成一个 pandas 数据帧并将其发送给执行程序。
UPD.
我找到了一种避免创建 Row 对象的方法——使用 python 元组的 RDD。正如预期的那样,它仍然太慢,但仍然比使用 Row 对象快一点。尽管如此,这并不是我真正想要的(这是将柱状数据从 python 传递到 Spark 的一种非常有效的方式)。
还测量了在机器上执行某些操作的时间(粗略的方式,测量时间有相当大的变化,但在我看来仍然具有代表性):
有问题的数据集是 10M 行,3 列(一列是常量整数,其他是从 0 到 10M-1 的整数范围,第三个是使用 np.random.random_sample
:
生成的浮点值
- 本地生成 pandas 数据帧(1000 万行):~440-450 毫秒
- 本地生成 python 个 spark.sql.Row 对象列表(1000 万行):~12-15s
- 本地生成 python 表示行(10M 行)的元组列表:~3.4-3.5s
仅使用 1 个执行器和 1 个初始种子值生成 Spark 数据帧:
- 使用
spark.createDataFrame(row_rdd, schema=my_schema)
:~70-80s
- 使用
spark.createDataFrame(tuple_rdd, schema=my_schema)
:~40-45s
- (非分布式创建)使用
spark.createDataFrame(pandas_df, schema=my_schema)
:~0.4-0.5s(没有 pandas df 生成本身花费的时间大致相同)- spark.sql.execution.arrow.enabled
设置为 true。
本地到驱动程序 pandas 数据帧在 ~1 秒内转换为 1000 万行 Spark 数据帧的示例让我有理由相信执行程序中生成的数据帧也应该是可能的。然而,我现在能达到的最快速度是使用 python 元组的 RDD 对 1000 万行进行约 40 秒。
所以问题仍然存在 - 有没有办法在 pyspark 中以分布式方式高效地生成大型 Spark 数据帧?
这是不使用 Row
的问题解决方案 - 仅基于 RDD。我认为这可能是最有效的方法,因为它使用 map
来计算你的函数输出,并使用 flatMap
来组合这些输出——这两个操作都是在 RDD 上执行的,所以一切都应该是分布式的。
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
def generate_data(one_integer):
M = 2 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
return [(one_integer, i, float(np_array[i])) for i in range(M)]
N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)
df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()
听起来瓶颈是从 RDD -> Dataframes 的转换,而且手头的功能相当快,pandas 通过 pyarrow 到 spark DF 的 DF 转换非常快。这里有两个可能的解决方案:
- 因为并行创建 pandas df 很容易,而不是 return 从执行器中获取它,使用
df.to_parquet
编写生成的 df,即:
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"
之后生成的 parquet 文件中的 Spark 读取应该是微不足道的。然后你的瓶颈变成了 IO 限制,这应该比 spark 转换 tuples/Row 类型更快。
- 如果您不允许将任何内容保存到文件,
pandas_udf
和 GROUPED_MAP
可能会帮助您,前提是您的 spark 版本足够新。它也使用 pyarrow 在 spark DF 和 pandas DF 之间进行转换,因此它应该比使用元组更快,并且允许您以分布式方式从 UDF 创建和 return pandas DF。
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
N = 10
df = spark.createDataFrame(
[(i,) for i in range(N)], ["seed"]
)
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
return df.reset_index()
@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
output = []
for idx, row in pdf.iterrows():
output.append(generate_data(row["seed"]))
return pd.concat(output)
df.groupby("seed").apply(generate_data_udf).show()
较慢的部分是 groupby
,您可以加快速度,具体取决于您如何将种子分批进入 generate_data_udf
,即:
@udf(returnType=IntegerType())
def batch_seed(seed):
return seed // 10
df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
这里有一个不使用 RDD 或创建 Rows 的解决方案,仅使用数据帧操作:
(代码在 scala 中,但在 python 中做同样的事情应该很简单)
val N = 100000
//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)
spark
.range(N)
.toDF("seed")
.withColumn("arr", generate_data_udf($"seed"))
.select(
$"seed",
explode($"arr") as "exp"
)
.select(
$"seed",
$"exp._1" as "n",
$"exp._2" as "x"
)
问题归结为以下几点:我想使用现有的并行化输入集合在 pyspark 中生成一个数据帧,并且一个函数给定一个输入可以生成相对较大的行。在下面的示例中,我想使用例如生成 10^12 行数据框1000 名执行者:
def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
StructField("seed", IntegerType()),
StructField("n", IntegerType()),
StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)
(我真的不想研究给定种子的随机数分布——这只是我想出的一个例子来说明大数据帧不是从仓库加载,而是由代码)
上面的代码几乎完全符合我的要求。问题是它以一种非常低效的方式实现——以为每一行创建一个 python Row 对象为代价,然后将 python Row 对象转换为内部 Spark 柱状表示。
有没有一种方法可以通过让 spark 知道这些是一批值的列来转换已经在柱状表示中的一批行(例如上面的一个或几个 numpy 数组 np_array
) ?
例如我可以编写代码来生成 python 集合 RDD,其中每个元素都是 pyarrow.RecordBatch 或 pandas.DataFrame,但我无法找到一种方法将其中任何一个转换为 Spark DataFrame 而不创建进程中 pyspark Row 对象的 RDD。
至少有十几篇文章举例说明了如何使用 pyarrow + pandas 将本地(到驱动程序)pandas 数据帧有效地转换为 Spark 数据帧,但这不是一个我的选择,因为我需要在执行程序上以分布式方式实际生成数据,而不是在驱动程序上生成一个 pandas 数据帧并将其发送给执行程序。
UPD. 我找到了一种避免创建 Row 对象的方法——使用 python 元组的 RDD。正如预期的那样,它仍然太慢,但仍然比使用 Row 对象快一点。尽管如此,这并不是我真正想要的(这是将柱状数据从 python 传递到 Spark 的一种非常有效的方式)。
还测量了在机器上执行某些操作的时间(粗略的方式,测量时间有相当大的变化,但在我看来仍然具有代表性):
有问题的数据集是 10M 行,3 列(一列是常量整数,其他是从 0 到 10M-1 的整数范围,第三个是使用 np.random.random_sample
:
- 本地生成 pandas 数据帧(1000 万行):~440-450 毫秒
- 本地生成 python 个 spark.sql.Row 对象列表(1000 万行):~12-15s
- 本地生成 python 表示行(10M 行)的元组列表:~3.4-3.5s
仅使用 1 个执行器和 1 个初始种子值生成 Spark 数据帧:
- 使用
spark.createDataFrame(row_rdd, schema=my_schema)
:~70-80s - 使用
spark.createDataFrame(tuple_rdd, schema=my_schema)
:~40-45s - (非分布式创建)使用
spark.createDataFrame(pandas_df, schema=my_schema)
:~0.4-0.5s(没有 pandas df 生成本身花费的时间大致相同)-spark.sql.execution.arrow.enabled
设置为 true。
本地到驱动程序 pandas 数据帧在 ~1 秒内转换为 1000 万行 Spark 数据帧的示例让我有理由相信执行程序中生成的数据帧也应该是可能的。然而,我现在能达到的最快速度是使用 python 元组的 RDD 对 1000 万行进行约 40 秒。
所以问题仍然存在 - 有没有办法在 pyspark 中以分布式方式高效地生成大型 Spark 数据帧?
这是不使用 Row
的问题解决方案 - 仅基于 RDD。我认为这可能是最有效的方法,因为它使用 map
来计算你的函数输出,并使用 flatMap
来组合这些输出——这两个操作都是在 RDD 上执行的,所以一切都应该是分布式的。
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
def generate_data(one_integer):
M = 2 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
return [(one_integer, i, float(np_array[i])) for i in range(M)]
N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)
df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()
听起来瓶颈是从 RDD -> Dataframes 的转换,而且手头的功能相当快,pandas 通过 pyarrow 到 spark DF 的 DF 转换非常快。这里有两个可能的解决方案:
- 因为并行创建 pandas df 很容易,而不是 return 从执行器中获取它,使用
df.to_parquet
编写生成的 df,即:
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"
之后生成的 parquet 文件中的 Spark 读取应该是微不足道的。然后你的瓶颈变成了 IO 限制,这应该比 spark 转换 tuples/Row 类型更快。
- 如果您不允许将任何内容保存到文件,
pandas_udf
和GROUPED_MAP
可能会帮助您,前提是您的 spark 版本足够新。它也使用 pyarrow 在 spark DF 和 pandas DF 之间进行转换,因此它应该比使用元组更快,并且允许您以分布式方式从 UDF 创建和 return pandas DF。
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
N = 10
df = spark.createDataFrame(
[(i,) for i in range(N)], ["seed"]
)
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
return df.reset_index()
@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
output = []
for idx, row in pdf.iterrows():
output.append(generate_data(row["seed"]))
return pd.concat(output)
df.groupby("seed").apply(generate_data_udf).show()
较慢的部分是 groupby
,您可以加快速度,具体取决于您如何将种子分批进入 generate_data_udf
,即:
@udf(returnType=IntegerType())
def batch_seed(seed):
return seed // 10
df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
这里有一个不使用 RDD 或创建 Rows 的解决方案,仅使用数据帧操作:
(代码在 scala 中,但在 python 中做同样的事情应该很简单)
val N = 100000
//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)
spark
.range(N)
.toDF("seed")
.withColumn("arr", generate_data_udf($"seed"))
.select(
$"seed",
explode($"arr") as "exp"
)
.select(
$"seed",
$"exp._1" as "n",
$"exp._2" as "x"
)