使用 RDD 在 PySpark 中创建数据框

Create a dataframe in PySpark using RDD

我正在尝试创建一个函数,它将接受一个字典和模式作为输入,returns一个数据框自动将未指定的字段填充为空值。这是我下面的代码

def get_element(name, row_dict):
    value = None
    if name in row_dict:
        value = row_dict[name]

    return value


def create_row(schema, row_dict):
    row_tuple = ()
    for fields in schema:
        element = get_element(fields.name, row_dict)
        row_tuple = (*row_tuple, element)

    return row_tuple


def fill(schema, values):
    spark = (
        SparkSession
            .builder
            .master("local[*]")
            .appName("pysparktest")
            .getOrCreate()
    )
    return \
        spark.createDataFrame(
            spark.sparkContext.parallelize(
                [(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
            ),
            schema
        )

我是这样调用函数的:

   schema = T.StructType([T.StructField("base_currency", T.StringType()),
                           T.StructField("target_currency", T.StringType()),
                           T.StructField("valid_from", T.StringType()),
                           T.StructField("valid_until", T.StringType())])

    values = [
        {"base_currency": "USD", "target_currency": "EUR", "valid_from": "test",
         "valid_until": "test"},
        {"base_currency": "USD1", "target_currency": "EUR2"}
    ]

    fill(schema, values).show()

错误信息:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
test_utilities/create_df_from_schema.py:37: in fill
    [(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:566: in parallelize
    jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:603: in _serialize_to_jvm
    serializer.dump_stream(data, tempFile)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:211: in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:133: in dump_stream
    self._write_with_length(obj, stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:143: in _write_with_length
    serialized = self.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = PickleSerializer()
obj = [<generator object fill.<locals>.<genexpr> at 0x1091b9350>]

    def dumps(self, obj):
>       return pickle.dumps(obj, pickle_protocol)
E       TypeError: can't pickle generator objects

../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:427: TypeError

不知何故构造数据框的语法不正确。

您已经从 create_row 函数返回元组,您不需要创建 Row 对象,只需将元组列表传递给 spark.createDataFrame,如下所示:

def fill(schema, values):
    return spark.createDataFrame(
            [create_row(schema.fields, row_dict) for row_dict in values],
            schema
        )

现在您可以拨打:

fill(schema, values).show()

#+-------------+---------------+----------+-----------+
#|base_currency|target_currency|valid_from|valid_until|
#+-------------+---------------+----------+-----------+
#|          USD|            EUR|      test|       test|
#|         USD1|           EUR2|      null|       null|
#+-------------+---------------+----------+-----------+

此外,您实际上可以将代码简化为一行 list-comprehension,而无需定义这些函数:

spark.createDataFrame(
    [[row.get(f.name) for f in schema.fields] for row in values],
    schema
).show()

在字典对象 returns 上调用 .get(key) None 如果 key 不存在。