如何在不使用合并的情况下在本地系统的单个文件中写入 spark 数据帧

How to write spark dataframe in a single file in local system without using coalesce

我想从 pyspark 数据帧生成一个 avro 文件,目前我正在做 coalesce 如下

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')

但这现在会导致内存问题,因为所有数据在写入之前都会被提取到内存中,而且我的数据大小每天都在持续增长。所以我想按每个分区写入数据,这样数据就会以块的形式写入磁盘,而不会引起 OOM 问题。我发现 toLocalIterator 有助于实现这一目标。但我不确定如何使用它。我尝试了以下用法,它 returns 所有行

iter = df.toLocalIterator()
for i in iter:
    print('writing some data')
    # write the data into disk/file

迭代器迭代每一行而不是每个分区。我应该怎么做?

当你df = df.coalesce(1) 所有数据都收集到一个工作节点中。如果该节点由于节点上的资源限制而无法处理如此巨大的任务,则该作业将因 OOM 错误而失败。

根据 spark 文档 toLocalIterator Returns 包含当前数据集中所有行的迭代器 它可以消耗的最大内存相当于最大此数据集中的分区

LocalIterator 是如何工作的?

第一个分区被发送给驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推直到最后一个分区..这就是为什么(它可以占用的最大内存=最大分区) 确保你的主节点有足够的内存和磁盘。

toLocalIterator.next() 方法确保在前面的分区处理完成后提取下一个分区记录。

what you can do is 

    //batch objects like 1000 per batch
    df.toLocalIterator().foreach(obj => {
      //add object in array
      //if batch size is reached ... 
        //then serialize them and use FileOutputStream and save in local location 
    })

注意:确保缓存您的 parentDF .. 否则在某些情况下每个分区都需要重新计算。