如何在保留现有架构的同时从行中创建 DataFrame?

How to create a DataFrame out of rows while retaining existing schema?

如果我调用 map 或 mapPartition 并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?

目前我在做类似的事情:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)

为了创建 spark SQL 数据框,您需要一个配置单元上下文:

hc = HiveContext(sparkContext)

使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:

sparkSQLdataframe = hc.inferSchema(rows)  

您可以使用 toPandas(),

pandasdf = mydf.toPandas()

Spark >= 2.3.0

从 Spark 2.3.0 开始,可以按分区或组使用 Pandas SeriesDataFrame。参见示例:

  • Applying UDFs on GroupedData in PySpark (with functioning python example)

Spark < 2.3.0

what is the natural way to create either a local PySpark

没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢另一个不能嵌套操作或转换的角度。

or Pandas DataFrame

相对容易,但你至少要记住几件事:

  • Pandas 和 Spark DataFrames 甚至根本不等同。这些是不同的结构,具有不同的属性,通常不能用一个替换另一个。
  • 分区可以为空。
  • 你好像在传递字典。请记住,基础 Python 字典是无序的(例如与 collections.OrderedDict 不同)。因此传递列可能无法按预期工作。
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1

实际上可以在执行程序中将 Spark 行转换为 Pandas,并最终使用 mapPartitions 从这些输出中创建 Spark DataFrame。 See my gist in Github

# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
    # convert rows to dict
    rows = (row_.asDict() for row_ in rdd_)
    # create pandas dataframe
    pdf = pd.DataFrame(rows)

    # Rows/Pandas DF can be empty depending on patiition logic.
    # Make sure to check it here, otherwise it will throw untrackable error
    if len(pdf) > 0:
        #
        # Do something with pandas DataFrame 
        #
        pass

    return pdf.to_dict(orient='records')

# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))