加入一个巨大的火花数据框
Joining a large and a ginormous spark dataframe
我有两个数据框,df1 有 600 万行,df2 有 10 亿行。
我试过标准 df1.join(df2,df1("id")<=>df2("id2"))
,但是 运行 内存不足。
df1 太大,无法放入广播连接。
我什至尝试过布隆过滤器,但它也太大了,无法放入广播中,但仍然有用。
我尝试过的唯一没有出错的方法是将 df1 分成 300,000 行块并在 foreach 循环中与 df2 连接。但这比它可能需要的时间长一个数量级(可能是因为它太大而不适合作为持久性导致它重做到那个点的分裂)。重新组合结果也需要一段时间。
你是怎么解决这个问题的?
一些注意事项:
df1 是 df2 的子集。 df1=df2.where("fin<1").selectExpr("id as id2").distinct()
我对 df2 中所有具有一次 fin<1 的 id 的行感兴趣,这意味着我不能一步完成。
df2 中大约有 2 亿个唯一 ID。
这里有一些相关的 spark 设置:
spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000
我得到的错误是:
16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)
和
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory
你可以尝试设置 spark.shuffle.memoryFraction=0.0 吗?
这将导致 shuffle 将所有内容溢出到磁盘,您将永远不会遇到 OOM 错误
据我所知,您遇到了分区过大的问题(可能是由于数据较大)
您可以尝试几种方法:
尝试将 spark.sql.shuffle.partitions 定义为 2048 甚至更多(默认为 200)。加入你的 df-s 时会有洗牌。尝试使用这个参数,这样更大的数据/这个参数的总量将约为 64Mb-100Mb(取决于文件格式)。一般来说,您应该在 spark UI 中看到每个任务(每个分区)处理 "normal" 数据量(最大 64MB-100MB)
如果 first 不起作用,我可以建议在 RDD api 中加入此操作。将您的 df 转换为 RDD。然后通过 HashPartitioner(分区数)对两个 RDD 进行分区。何时应该按照我之前的描述计算分区数。
spark 开发人员最近添加了新选项:您可以将巨大的 table 存储到 N 个存储桶中(即存储它以备加入)。目前的限制很少,但它可以完全消除混洗巨大的数据。 bucketBy 仅支持 saveAsTable api 而不是保存一个。在你 bucketBy 数据和它被分桶之后,然后在下一次迭代中你可以加载这个数据作为外部 table 同时提供分桶规范(参见 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)
创建 TABLE 巨大
--...这里你必须指定模式
使用镶木地板
CLUSTERED BY (a,b,c) INTO N 个桶
地点 'hdfs://your-path'
然后,当您将巨大的 table 作为分桶加载后,您可以加载大 table 并将其重新分区到相同数量的桶和相同的列(df.repartition (N,a,b,c))
我有两个数据框,df1 有 600 万行,df2 有 10 亿行。
我试过标准 df1.join(df2,df1("id")<=>df2("id2"))
,但是 运行 内存不足。
df1 太大,无法放入广播连接。
我什至尝试过布隆过滤器,但它也太大了,无法放入广播中,但仍然有用。
我尝试过的唯一没有出错的方法是将 df1 分成 300,000 行块并在 foreach 循环中与 df2 连接。但这比它可能需要的时间长一个数量级(可能是因为它太大而不适合作为持久性导致它重做到那个点的分裂)。重新组合结果也需要一段时间。
你是怎么解决这个问题的?
一些注意事项:
df1 是 df2 的子集。 df1=df2.where("fin<1").selectExpr("id as id2").distinct()
我对 df2 中所有具有一次 fin<1 的 id 的行感兴趣,这意味着我不能一步完成。
df2 中大约有 2 亿个唯一 ID。
这里有一些相关的 spark 设置:
spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000
我得到的错误是:
16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)
和
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory
你可以尝试设置 spark.shuffle.memoryFraction=0.0 吗?
这将导致 shuffle 将所有内容溢出到磁盘,您将永远不会遇到 OOM 错误
据我所知,您遇到了分区过大的问题(可能是由于数据较大) 您可以尝试几种方法:
尝试将 spark.sql.shuffle.partitions 定义为 2048 甚至更多(默认为 200)。加入你的 df-s 时会有洗牌。尝试使用这个参数,这样更大的数据/这个参数的总量将约为 64Mb-100Mb(取决于文件格式)。一般来说,您应该在 spark UI 中看到每个任务(每个分区)处理 "normal" 数据量(最大 64MB-100MB)
如果 first 不起作用,我可以建议在 RDD api 中加入此操作。将您的 df 转换为 RDD。然后通过 HashPartitioner(分区数)对两个 RDD 进行分区。何时应该按照我之前的描述计算分区数。
spark 开发人员最近添加了新选项:您可以将巨大的 table 存储到 N 个存储桶中(即存储它以备加入)。目前的限制很少,但它可以完全消除混洗巨大的数据。 bucketBy 仅支持 saveAsTable api 而不是保存一个。在你 bucketBy 数据和它被分桶之后,然后在下一次迭代中你可以加载这个数据作为外部 table 同时提供分桶规范(参见 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)
创建 TABLE 巨大 --...这里你必须指定模式 使用镶木地板 CLUSTERED BY (a,b,c) INTO N 个桶 地点 'hdfs://your-path'
然后,当您将巨大的 table 作为分桶加载后,您可以加载大 table 并将其重新分区到相同数量的桶和相同的列(df.repartition (N,a,b,c))