为什么在将 Apache Arrow 用于字符串类型时 pySpark 会崩溃?
Why does pySpark crash when using Apache Arrow for string types?
为了在大型数据集上获得一些离群值图,我需要将火花 DataFrame
转换为 pandas。对于 Apache Arrow,一个简单的 运行 在将 x 转换为字符串时使我的 pyspark 控制台崩溃(没有转换它工作正常),为什么?
Using Python version 3.8.9 (default, Apr 10 2021 15:47:22)
Spark context Web UI available at http://6d0b1018a45a:4040
Spark context available as 'sc' (master = local[*], app id = local-1621164597906).
SparkSession available as 'spark'.
>>> import time
>>> from pyspark.sql.functions import rand
>>> from pyspark.sql import functions as F
>>> spark = SparkSession.builder.appName("Console_Test").getOrCreate()
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
21/05/16 11:31:03 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
>>> a_df = spark.range(1 << 25).toDF("id").withColumn("x", rand())
>>> a_df = a_df.withColumn("id", F.col("id").cast("string"))
>>> start_t = time.time()
>>> a_pd = a_df.toPandas()
Killed
#
此外,我注意到 spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
等选项似乎没有效果,因为网络 ui 显示分配给任务的记录明显超过 5000。
任何关于如何解决 pyspark 控制台崩溃或更直接地渲染大型散点图的指示都将受到高度赞赏 - 我已经(未成功)尝试找到一种方法来应用 Table.to_pandas(split_blocks=True, self_destruct=True)
但没有得到能够来自火花的结构 DataFrame
.
您尝试将 33.5 mio (2^25) 行转换为 Pandas 数据框。这将导致 OutOfMemoryError,因为所有数据都将传输到 Spark 驱动程序。
查找异常值的一种方法是计算列 x
的 histogram,然后在创建 Pandas数据框:
hist = a_df.select("x").rdd.flatMap(lambda x: x).histogram(10) #create 10 bins
hist
是两个数组的元组:第一个数组包含 bin 的边界,第二个数组包含每个 bin 中的元素数:
([1.7855041778425118e-08,
0.1000000152099446,
0.20000001256484742,
0.30000000991975023,
0.40000000727465307,
0.5000000046295558,
0.6000000019844587,
0.6999999993393615,
0.7999999966942644,
0.8999999940491672,
0.99999999140407],
[3355812,
3356891,
3352364,
3352438,
3357564,
3356213,
3354933,
3355144,
3357241,
3355832])
rand 创建均匀分布的随机数,因此这种情况下的直方图不是很有趣。但是对于真实世界的分布,直方图会很有用。
为了在大型数据集上获得一些离群值图,我需要将火花 DataFrame
转换为 pandas。对于 Apache Arrow,一个简单的 运行 在将 x 转换为字符串时使我的 pyspark 控制台崩溃(没有转换它工作正常),为什么?
Using Python version 3.8.9 (default, Apr 10 2021 15:47:22)
Spark context Web UI available at http://6d0b1018a45a:4040
Spark context available as 'sc' (master = local[*], app id = local-1621164597906).
SparkSession available as 'spark'.
>>> import time
>>> from pyspark.sql.functions import rand
>>> from pyspark.sql import functions as F
>>> spark = SparkSession.builder.appName("Console_Test").getOrCreate()
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
21/05/16 11:31:03 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
>>> a_df = spark.range(1 << 25).toDF("id").withColumn("x", rand())
>>> a_df = a_df.withColumn("id", F.col("id").cast("string"))
>>> start_t = time.time()
>>> a_pd = a_df.toPandas()
Killed
#
此外,我注意到 spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
等选项似乎没有效果,因为网络 ui 显示分配给任务的记录明显超过 5000。
任何关于如何解决 pyspark 控制台崩溃或更直接地渲染大型散点图的指示都将受到高度赞赏 - 我已经(未成功)尝试找到一种方法来应用 Table.to_pandas(split_blocks=True, self_destruct=True)
但没有得到能够来自火花的结构 DataFrame
.
您尝试将 33.5 mio (2^25) 行转换为 Pandas 数据框。这将导致 OutOfMemoryError,因为所有数据都将传输到 Spark 驱动程序。
查找异常值的一种方法是计算列 x
的 histogram,然后在创建 Pandas数据框:
hist = a_df.select("x").rdd.flatMap(lambda x: x).histogram(10) #create 10 bins
hist
是两个数组的元组:第一个数组包含 bin 的边界,第二个数组包含每个 bin 中的元素数:
([1.7855041778425118e-08,
0.1000000152099446,
0.20000001256484742,
0.30000000991975023,
0.40000000727465307,
0.5000000046295558,
0.6000000019844587,
0.6999999993393615,
0.7999999966942644,
0.8999999940491672,
0.99999999140407],
[3355812,
3356891,
3352364,
3352438,
3357564,
3356213,
3354933,
3355144,
3357241,
3355832])
rand 创建均匀分布的随机数,因此这种情况下的直方图不是很有趣。但是对于真实世界的分布,直方图会很有用。