如何在保留现有架构的同时从行中创建 DataFrame?
How to create a DataFrame out of rows while retaining existing schema?
如果我调用 map 或 mapPartition
并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?
目前我在做类似的事情:
def combine(partition):
rows = [x for x in partition]
dfpart = pd.DataFrame(rows,columns=rows[0].keys())
pandafunc(dfpart)
mydf.mapPartition(combine)
为了创建 spark SQL 数据框,您需要一个配置单元上下文:
hc = HiveContext(sparkContext)
使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:
sparkSQLdataframe = hc.inferSchema(rows)
您可以使用 toPandas()
,
pandasdf = mydf.toPandas()
Spark >= 2.3.0
从 Spark 2.3.0 开始,可以按分区或组使用 Pandas Series
或 DataFrame
。参见示例:
- Applying UDFs on GroupedData in PySpark (with functioning python example)
Spark < 2.3.0
what is the natural way to create either a local PySpark
没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢另一个不能嵌套操作或转换的角度。
or Pandas DataFrame
相对容易,但你至少要记住几件事:
- Pandas 和 Spark DataFrames 甚至根本不等同。这些是不同的结构,具有不同的属性,通常不能用一个替换另一个。
- 分区可以为空。
- 你好像在传递字典。请记住,基础 Python 字典是无序的(例如与
collections.OrderedDict
不同)。因此传递列可能无法按预期工作。
import pandas as pd
rdd = sc.parallelize([
{"x": 1, "y": -1},
{"x": -3, "y": 0},
{"x": -0, "y": 4}
])
def combine(iter):
rows = list(iter)
return [pd.DataFrame(rows)] if rows else []
rdd.mapPartitions(combine).first()
## x y
## 0 1 -1
实际上可以在执行程序中将 Spark 行转换为 Pandas,并最终使用 mapPartitions
从这些输出中创建 Spark DataFrame。 See my gist in Github
# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
# convert rows to dict
rows = (row_.asDict() for row_ in rdd_)
# create pandas dataframe
pdf = pd.DataFrame(rows)
# Rows/Pandas DF can be empty depending on patiition logic.
# Make sure to check it here, otherwise it will throw untrackable error
if len(pdf) > 0:
#
# Do something with pandas DataFrame
#
pass
return pdf.to_dict(orient='records')
# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))
如果我调用 map 或 mapPartition
并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?
目前我在做类似的事情:
def combine(partition):
rows = [x for x in partition]
dfpart = pd.DataFrame(rows,columns=rows[0].keys())
pandafunc(dfpart)
mydf.mapPartition(combine)
为了创建 spark SQL 数据框,您需要一个配置单元上下文:
hc = HiveContext(sparkContext)
使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:
sparkSQLdataframe = hc.inferSchema(rows)
您可以使用 toPandas()
,
pandasdf = mydf.toPandas()
Spark >= 2.3.0
从 Spark 2.3.0 开始,可以按分区或组使用 Pandas Series
或 DataFrame
。参见示例:
- Applying UDFs on GroupedData in PySpark (with functioning python example)
Spark < 2.3.0
what is the natural way to create either a local PySpark
没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢另一个不能嵌套操作或转换的角度。
or Pandas DataFrame
相对容易,但你至少要记住几件事:
- Pandas 和 Spark DataFrames 甚至根本不等同。这些是不同的结构,具有不同的属性,通常不能用一个替换另一个。
- 分区可以为空。
- 你好像在传递字典。请记住,基础 Python 字典是无序的(例如与
collections.OrderedDict
不同)。因此传递列可能无法按预期工作。
import pandas as pd
rdd = sc.parallelize([
{"x": 1, "y": -1},
{"x": -3, "y": 0},
{"x": -0, "y": 4}
])
def combine(iter):
rows = list(iter)
return [pd.DataFrame(rows)] if rows else []
rdd.mapPartitions(combine).first()
## x y
## 0 1 -1
实际上可以在执行程序中将 Spark 行转换为 Pandas,并最终使用 mapPartitions
从这些输出中创建 Spark DataFrame。 See my gist in Github
# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
# convert rows to dict
rows = (row_.asDict() for row_ in rdd_)
# create pandas dataframe
pdf = pd.DataFrame(rows)
# Rows/Pandas DF can be empty depending on patiition logic.
# Make sure to check it here, otherwise it will throw untrackable error
if len(pdf) > 0:
#
# Do something with pandas DataFrame
#
pass
return pdf.to_dict(orient='records')
# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))