使用 RDD 在 PySpark 中创建数据框
Create a dataframe in PySpark using RDD
我正在尝试创建一个函数,它将接受一个字典和模式作为输入,returns一个数据框自动将未指定的字段填充为空值。这是我下面的代码
def get_element(name, row_dict):
value = None
if name in row_dict:
value = row_dict[name]
return value
def create_row(schema, row_dict):
row_tuple = ()
for fields in schema:
element = get_element(fields.name, row_dict)
row_tuple = (*row_tuple, element)
return row_tuple
def fill(schema, values):
spark = (
SparkSession
.builder
.master("local[*]")
.appName("pysparktest")
.getOrCreate()
)
return \
spark.createDataFrame(
spark.sparkContext.parallelize(
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
),
schema
)
我是这样调用函数的:
schema = T.StructType([T.StructField("base_currency", T.StringType()),
T.StructField("target_currency", T.StringType()),
T.StructField("valid_from", T.StringType()),
T.StructField("valid_until", T.StringType())])
values = [
{"base_currency": "USD", "target_currency": "EUR", "valid_from": "test",
"valid_until": "test"},
{"base_currency": "USD1", "target_currency": "EUR2"}
]
fill(schema, values).show()
错误信息:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_utilities/create_df_from_schema.py:37: in fill
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:566: in parallelize
jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:603: in _serialize_to_jvm
serializer.dump_stream(data, tempFile)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:211: in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:133: in dump_stream
self._write_with_length(obj, stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:143: in _write_with_length
serialized = self.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = PickleSerializer()
obj = [<generator object fill.<locals>.<genexpr> at 0x1091b9350>]
def dumps(self, obj):
> return pickle.dumps(obj, pickle_protocol)
E TypeError: can't pickle generator objects
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:427: TypeError
不知何故构造数据框的语法不正确。
您已经从 create_row
函数返回元组,您不需要创建 Row
对象,只需将元组列表传递给 spark.createDataFrame
,如下所示:
def fill(schema, values):
return spark.createDataFrame(
[create_row(schema.fields, row_dict) for row_dict in values],
schema
)
现在您可以拨打:
fill(schema, values).show()
#+-------------+---------------+----------+-----------+
#|base_currency|target_currency|valid_from|valid_until|
#+-------------+---------------+----------+-----------+
#| USD| EUR| test| test|
#| USD1| EUR2| null| null|
#+-------------+---------------+----------+-----------+
此外,您实际上可以将代码简化为一行 list-comprehension,而无需定义这些函数:
spark.createDataFrame(
[[row.get(f.name) for f in schema.fields] for row in values],
schema
).show()
在字典对象 returns 上调用 .get(key)
None 如果 key
不存在。
我正在尝试创建一个函数,它将接受一个字典和模式作为输入,returns一个数据框自动将未指定的字段填充为空值。这是我下面的代码
def get_element(name, row_dict):
value = None
if name in row_dict:
value = row_dict[name]
return value
def create_row(schema, row_dict):
row_tuple = ()
for fields in schema:
element = get_element(fields.name, row_dict)
row_tuple = (*row_tuple, element)
return row_tuple
def fill(schema, values):
spark = (
SparkSession
.builder
.master("local[*]")
.appName("pysparktest")
.getOrCreate()
)
return \
spark.createDataFrame(
spark.sparkContext.parallelize(
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
),
schema
)
我是这样调用函数的:
schema = T.StructType([T.StructField("base_currency", T.StringType()),
T.StructField("target_currency", T.StringType()),
T.StructField("valid_from", T.StringType()),
T.StructField("valid_until", T.StringType())])
values = [
{"base_currency": "USD", "target_currency": "EUR", "valid_from": "test",
"valid_until": "test"},
{"base_currency": "USD1", "target_currency": "EUR2"}
]
fill(schema, values).show()
错误信息:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_utilities/create_df_from_schema.py:37: in fill
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:566: in parallelize
jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:603: in _serialize_to_jvm
serializer.dump_stream(data, tempFile)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:211: in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:133: in dump_stream
self._write_with_length(obj, stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:143: in _write_with_length
serialized = self.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = PickleSerializer()
obj = [<generator object fill.<locals>.<genexpr> at 0x1091b9350>]
def dumps(self, obj):
> return pickle.dumps(obj, pickle_protocol)
E TypeError: can't pickle generator objects
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:427: TypeError
不知何故构造数据框的语法不正确。
您已经从 create_row
函数返回元组,您不需要创建 Row
对象,只需将元组列表传递给 spark.createDataFrame
,如下所示:
def fill(schema, values):
return spark.createDataFrame(
[create_row(schema.fields, row_dict) for row_dict in values],
schema
)
现在您可以拨打:
fill(schema, values).show()
#+-------------+---------------+----------+-----------+
#|base_currency|target_currency|valid_from|valid_until|
#+-------------+---------------+----------+-----------+
#| USD| EUR| test| test|
#| USD1| EUR2| null| null|
#+-------------+---------------+----------+-----------+
此外,您实际上可以将代码简化为一行 list-comprehension,而无需定义这些函数:
spark.createDataFrame(
[[row.get(f.name) for f in schema.fields] for row in values],
schema
).show()
在字典对象 returns 上调用 .get(key)
None 如果 key
不存在。