缓存有序的 Spark DataFrame 会创建不需要的作业
Caching ordered Spark DataFrame creates unwanted job
我想将 RDD 转换为 DataFrame 并想缓存 RDD 的结果:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as fn
schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())])
df = spark.createDataFrame(
sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(),
schema=schema,
verifySchema=False
).orderBy("t") #.cache()
- 如果您不使用
cache
函数,则不会生成任何作业。
- 如果仅在为
cache
生成 orderBy
1 个工作后才使用 cache
:
- 如果仅在
parallelize
之后使用 cache
,则不会生成任何作业。
为什么 cache
在这种情况下会生成工作?
如何避免 cache
的作业生成(缓存 DataFrame 而没有 RDD)?
编辑:我对该问题进行了更多调查,发现没有 orderBy("t")
就不会生成作业。为什么?
我提交了一个 bug ticket 并且关闭了,原因如下:
Caching requires the backing RDD. That requires we also know the
backing partitions, and this is somewhat special for a global order:
it triggers a job (scan) because we need to determine the partition
bounds.
我想将 RDD 转换为 DataFrame 并想缓存 RDD 的结果:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as fn
schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())])
df = spark.createDataFrame(
sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(),
schema=schema,
verifySchema=False
).orderBy("t") #.cache()
- 如果您不使用
cache
函数,则不会生成任何作业。 - 如果仅在为
cache
生成orderBy
1 个工作后才使用cache
: - 如果仅在
parallelize
之后使用cache
,则不会生成任何作业。
为什么 cache
在这种情况下会生成工作?
如何避免 cache
的作业生成(缓存 DataFrame 而没有 RDD)?
编辑:我对该问题进行了更多调查,发现没有 orderBy("t")
就不会生成作业。为什么?
我提交了一个 bug ticket 并且关闭了,原因如下:
Caching requires the backing RDD. That requires we also know the backing partitions, and this is somewhat special for a global order: it triggers a job (scan) because we need to determine the partition bounds.