如何将巨大的 pandas 数据帧保存到 hdfs?
How to save a huge pandas dataframe to hdfs?
我正在使用 pandas 和 spark 数据帧。数据帧总是非常大(> 20 GB),标准的 spark 函数不足以满足这些大小。目前我正在将我的 pandas 数据帧转换为这样的 spark 数据帧:
dataframe = spark.createDataFrame(pandas_dataframe)
我做这个转换是因为使用 spark 将数据帧写入 hdfs 非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
但是对于大于 2 GB 的数据帧,转换失败。
如果我将 spark 数据帧转换为 pandas 我可以使用 pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
这是从 spark 到 pandas 的快速转换,它也适用于大于 2 GB 的数据帧。我还找不到相反的方法。意思是有一个 pandas 数据框,我在 pyarrow 的帮助下将其转换为 spark。问题是我真的找不到如何将 pandas 数据帧写入 hdfs。
我的pandas版本:0.19.0
来自https://issues.apache.org/jira/browse/SPARK-6235
Support for parallelizing R data.frame larger than 2GB
已解决。
来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html
Converting DataFrames into R objects
您可以将 pandas 数据帧转换为 R data.frame
所以也许转换 pandas -> R -> Spark -> hdfs?
一个 hack 可能是从大数据帧创建 N pandas 个数据帧(每个小于 2 GB)(水平分区)并创建 N 个不同的 spark 数据帧,然后合并(联合)它们以创建最终的一个写入HDFS。我假设您的主机功能强大,但您也有可用的集群 运行 Spark。
Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.
pyarrow.Table.fromPandas
是您要查找的函数:
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)
Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
结果可以直接写入Parquet/HDFS,无需通过Spark传递数据:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
另见
- @WesMcKinney answer to read a parquet files from HDFS using PyArrow.
- Reading and Writing the Apache Parquet Format in the
pyarrow
documentation.
- Native Hadoop file system (HDFS) connectivity in Python
Spark 笔记:
此外,由于 Spark 2.3(当前主版本)直接在 createDataFrame
中支持 Arrow(SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame). It uses SparkContext.defaultParallelism
to compute number of chunks 因此您可以轻松控制单个批次的大小。
最后 defaultParallelism
可用于控制使用标准 _convert_from_pandas
生成的分区数量,有效地将切片的大小减小到更易于管理的程度。
不幸的是,这些不太可能解决您的 current memory problems。两者都依赖于 parallelize
,因此将所有数据存储在 driver 节点的内存中。切换到 Arrow 或调整配置只能加速进程或解决块大小限制。
实际上,只要您使用本地 Pandas DataFrame
作为输入,我看不出有任何理由在这里切换到 Spark。这种情况下最严重的瓶颈是 driver 的网络 I/O,分发数据不会解决这个问题。
另一种方法是将您的 pandas 数据帧转换为 spark 数据帧(使用 pyspark)并使用 save 命令将其保存到 hdfs。
例子
df = pd.read_csv("data/as/foo.csv")
df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)
此处 astype
将您的列的类型从 object
更改为 string
。这可以避免引发异常,因为 spark 无法弄清楚 pandas 类型 object
。但要确保这些列确实是字符串类型。
现在将您的 df 保存在 hdfs 中:
sdf.write.csv('mycsv.csv')
我正在使用 pandas 和 spark 数据帧。数据帧总是非常大(> 20 GB),标准的 spark 函数不足以满足这些大小。目前我正在将我的 pandas 数据帧转换为这样的 spark 数据帧:
dataframe = spark.createDataFrame(pandas_dataframe)
我做这个转换是因为使用 spark 将数据帧写入 hdfs 非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
但是对于大于 2 GB 的数据帧,转换失败。 如果我将 spark 数据帧转换为 pandas 我可以使用 pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
这是从 spark 到 pandas 的快速转换,它也适用于大于 2 GB 的数据帧。我还找不到相反的方法。意思是有一个 pandas 数据框,我在 pyarrow 的帮助下将其转换为 spark。问题是我真的找不到如何将 pandas 数据帧写入 hdfs。
我的pandas版本:0.19.0
来自https://issues.apache.org/jira/browse/SPARK-6235
Support for parallelizing R data.frame larger than 2GB
已解决。
来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html
Converting DataFrames into R objects
您可以将 pandas 数据帧转换为 R data.frame
所以也许转换 pandas -> R -> Spark -> hdfs?
一个 hack 可能是从大数据帧创建 N pandas 个数据帧(每个小于 2 GB)(水平分区)并创建 N 个不同的 spark 数据帧,然后合并(联合)它们以创建最终的一个写入HDFS。我假设您的主机功能强大,但您也有可用的集群 运行 Spark。
Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.
pyarrow.Table.fromPandas
是您要查找的函数:
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
结果可以直接写入Parquet/HDFS,无需通过Spark传递数据:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
另见
- @WesMcKinney answer to read a parquet files from HDFS using PyArrow.
- Reading and Writing the Apache Parquet Format in the
pyarrow
documentation. - Native Hadoop file system (HDFS) connectivity in Python
Spark 笔记:
此外,由于 Spark 2.3(当前主版本)直接在 createDataFrame
中支持 Arrow(SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame). It uses SparkContext.defaultParallelism
to compute number of chunks 因此您可以轻松控制单个批次的大小。
最后 defaultParallelism
可用于控制使用标准 _convert_from_pandas
生成的分区数量,有效地将切片的大小减小到更易于管理的程度。
不幸的是,这些不太可能解决您的 current memory problems。两者都依赖于 parallelize
,因此将所有数据存储在 driver 节点的内存中。切换到 Arrow 或调整配置只能加速进程或解决块大小限制。
实际上,只要您使用本地 Pandas DataFrame
作为输入,我看不出有任何理由在这里切换到 Spark。这种情况下最严重的瓶颈是 driver 的网络 I/O,分发数据不会解决这个问题。
另一种方法是将您的 pandas 数据帧转换为 spark 数据帧(使用 pyspark)并使用 save 命令将其保存到 hdfs。 例子
df = pd.read_csv("data/as/foo.csv")
df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)
此处 astype
将您的列的类型从 object
更改为 string
。这可以避免引发异常,因为 spark 无法弄清楚 pandas 类型 object
。但要确保这些列确实是字符串类型。
现在将您的 df 保存在 hdfs 中:
sdf.write.csv('mycsv.csv')