如何在不使用合并的情况下在本地系统的单个文件中写入 spark 数据帧
How to write spark dataframe in a single file in local system without using coalesce
我想从 pyspark 数据帧生成一个 avro 文件,目前我正在做 coalesce
如下
df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')
但这现在会导致内存问题,因为所有数据在写入之前都会被提取到内存中,而且我的数据大小每天都在持续增长。所以我想按每个分区写入数据,这样数据就会以块的形式写入磁盘,而不会引起 OOM 问题。我发现 toLocalIterator
有助于实现这一目标。但我不确定如何使用它。我尝试了以下用法,它 returns 所有行
iter = df.toLocalIterator()
for i in iter:
print('writing some data')
# write the data into disk/file
迭代器迭代每一行而不是每个分区。我应该怎么做?
当你df = df.coalesce(1)
所有数据都收集到一个工作节点中。如果该节点由于节点上的资源限制而无法处理如此巨大的任务,则该作业将因 OOM 错误而失败。
根据 spark 文档 toLocalIterator Returns 包含当前数据集中所有行的迭代器 和 它可以消耗的最大内存相当于最大此数据集中的分区
LocalIterator 是如何工作的?
第一个分区被发送给驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推直到最后一个分区..这就是为什么(它可以占用的最大内存=最大分区)
确保你的主节点有足够的内存和磁盘。
toLocalIterator.next() 方法确保在前面的分区处理完成后提取下一个分区记录。
what you can do is
//batch objects like 1000 per batch
df.toLocalIterator().foreach(obj => {
//add object in array
//if batch size is reached ...
//then serialize them and use FileOutputStream and save in local location
})
注意:确保缓存您的 parentDF .. 否则在某些情况下每个分区都需要重新计算。
我想从 pyspark 数据帧生成一个 avro 文件,目前我正在做 coalesce
如下
df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')
但这现在会导致内存问题,因为所有数据在写入之前都会被提取到内存中,而且我的数据大小每天都在持续增长。所以我想按每个分区写入数据,这样数据就会以块的形式写入磁盘,而不会引起 OOM 问题。我发现 toLocalIterator
有助于实现这一目标。但我不确定如何使用它。我尝试了以下用法,它 returns 所有行
iter = df.toLocalIterator()
for i in iter:
print('writing some data')
# write the data into disk/file
迭代器迭代每一行而不是每个分区。我应该怎么做?
当你df = df.coalesce(1)
所有数据都收集到一个工作节点中。如果该节点由于节点上的资源限制而无法处理如此巨大的任务,则该作业将因 OOM 错误而失败。
根据 spark 文档 toLocalIterator Returns 包含当前数据集中所有行的迭代器 和 它可以消耗的最大内存相当于最大此数据集中的分区
LocalIterator 是如何工作的?
第一个分区被发送给驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推直到最后一个分区..这就是为什么(它可以占用的最大内存=最大分区) 确保你的主节点有足够的内存和磁盘。
toLocalIterator.next() 方法确保在前面的分区处理完成后提取下一个分区记录。
what you can do is
//batch objects like 1000 per batch
df.toLocalIterator().foreach(obj => {
//add object in array
//if batch size is reached ...
//then serialize them and use FileOutputStream and save in local location
})
注意:确保缓存您的 parentDF .. 否则在某些情况下每个分区都需要重新计算。