如何将 pandas DataFrame 的 rdd 转换为 Spark DataFrame
How to convert a rdd of pandas DataFrame to Spark DataFrame
我创建了一个 pandas DataFrame 的 rdd 作为中间结果。我想转换一个 Spark DataFrame,最终将它保存到 parquet 文件中。
我想知道什么是有效的方法。
谢谢
def create_df(x):
return pd.DataFrame(np.random.rand(5, 3)).\
assign(col=x)
sc.parallelize(range(5)).map(create_df).\
.TO_DATAFRAME()..write.format("parquet").save("parquet_file")
我试过pd.concat将rdd缩减为一个大数据帧,似乎不对。
所以谈到效率,因为 spark 2.3 Apache Arrow 与 Spark 集成,它应该在 JVM 和 Python 进程之间有效地传输数据,从而提高从 pandas 数据帧转换的性能激发数据框。您可以通过
启用它
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
如果您的 spark 分布没有集成箭头,这不应引发错误,只会被忽略。
pyspark shell 中 运行 的示例代码如下所示:
import numpy as np
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pdf = pd.DataFrame(np.random.rand(100, 3))
df = spark.createDataFrame(pdf)
df.write.format("parquet").save('data_parquet_file')
您的 create_df 方法 returns 熊猫数据框,您可以从中创建 spark 数据框 - 不确定为什么需要 "sc.parallelize(range(5)).map(create_df)"
所以你的完整代码可以像
import pandas as pd
import numpy as np
def create_df(x):
return pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
pdf = create_df(10)
df = spark.createDataFrame(pdf)
df.write.format("parquet").save('data_parquet_file')
import pandas as pd
def create_df(x):
df=pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
return df.values.tolist()
sc.parallelize(range(5)).flatMap(create_df).toDF().\
.write.format("parquet").save("parquet_file")
我创建了一个 pandas DataFrame 的 rdd 作为中间结果。我想转换一个 Spark DataFrame,最终将它保存到 parquet 文件中。
我想知道什么是有效的方法。
谢谢
def create_df(x):
return pd.DataFrame(np.random.rand(5, 3)).\
assign(col=x)
sc.parallelize(range(5)).map(create_df).\
.TO_DATAFRAME()..write.format("parquet").save("parquet_file")
我试过pd.concat将rdd缩减为一个大数据帧,似乎不对。
所以谈到效率,因为 spark 2.3 Apache Arrow 与 Spark 集成,它应该在 JVM 和 Python 进程之间有效地传输数据,从而提高从 pandas 数据帧转换的性能激发数据框。您可以通过
启用它spark.conf.set("spark.sql.execution.arrow.enabled", "true")
如果您的 spark 分布没有集成箭头,这不应引发错误,只会被忽略。
pyspark shell 中 运行 的示例代码如下所示:
import numpy as np
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pdf = pd.DataFrame(np.random.rand(100, 3))
df = spark.createDataFrame(pdf)
df.write.format("parquet").save('data_parquet_file')
您的 create_df 方法 returns 熊猫数据框,您可以从中创建 spark 数据框 - 不确定为什么需要 "sc.parallelize(range(5)).map(create_df)"
所以你的完整代码可以像
import pandas as pd
import numpy as np
def create_df(x):
return pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
pdf = create_df(10)
df = spark.createDataFrame(pdf)
df.write.format("parquet").save('data_parquet_file')
import pandas as pd
def create_df(x):
df=pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
return df.values.tolist()
sc.parallelize(range(5)).flatMap(create_df).toDF().\
.write.format("parquet").save("parquet_file")