在 Spark 中,如何将多个数据帧转换为 avro?
In Spark, How to convert multiple dataframes into an avro?
我有一个 Spark 作业,可以将一些数据处理成几个单独的数据帧。我将这些数据帧存储在一个列表中,即数据帧[]。最终,我想将这些数据帧组合成分层格式并用 avro 编写输出。 avro 模式是这样的:
{
"name": "mydata",
"type": "record",
"fields": [
{"name": "data", "type": {
"type": "array", "items": {
"name": "actualData", "type": "record", "fields": [
{"name": "metadata1", "type": "int"},
{"name": "metadata2", "type": "string"},
{"name": "dataframe", "type": {
"type": "array", "items": {
"name": "dataframeRecord", "type": "record", "fields": [
{"name": "field1", "type": "int"},
{"name": "field2", "type": "int"},
{"name": "field3", "type": ["string", "null"]}]
}
}
}]
}
}
}
]
}
可以推断,每个数据帧都有三个字段,field1、field2和field3,我想将其作为数组写入avro文件中。还有一些与每个数据框关联的元数据。
我目前的做法是,一旦处理完这些数据,将数据帧写入S3,然后使用单独的程序从S3中拉取这些数据,使用avro库编写一个avro文件,然后上传它再次到S3。
但是,随着数据量的增长,这变得非常缓慢。我查看了 databricks 库以直接编写 avro 文件,但我不知道如何在内存中将数据帧组合在一起,或者 databricks 库如何确定我正在使用的模式。
在 Spark 中是否有惯用的方法来做到这一点?
P.S。我在 Python.
中将 EMR 与 Spark 2.0.0 结合使用
如果模式相同,而您只想将所有记录放入同一个 DataFrame 中,则可以使用 DataFrame unionAll 方法。
http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll
此函数将获取一个数据帧并将其附加到另一个数据帧。问题是它假定列在两者之间的顺序相同,因此您可能需要做一些工作来让它们对齐并为任何缺失的列创建空列。这是我用来安全联合多个数据帧的 python 函数
def union_multiple_dataframes(iterable_list_df):
input_dfs = list(iterable_list_df)
# First figure out all the field names
field_types = {}
for df in input_dfs:
for field in df.schema.fields:
# Check for type mismatch
if field in field_types:
if field.dataType != field_types[field.name]:
raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
else:
field_types[field.name] = field.dataType
# First add in empty fields so all df's have the same schema
fields = set(field_types.keys())
for i, df in enumerate(input_dfs):
missing = fields - set(df.schema.names)
for field in missing:
df = df.withColumn(field, F.lit(None))
input_dfs[i] = df
# Finally put all the df's columns in the same order, and do the actual union
sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
return reduce(lambda x, y: x.unionAll(y), sorted_dfs)
示例用法如下:
input_dfs = [do_something(..) for x in y]
combined_df = union_multiple_dataframes(input_dfs)
combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
我找到了一个特定于 PySpark 的解决方案:
对于每个数据框,我使用 .collect() 来获取行列表。对于每个 Row 对象,我调用 asDict() 来获取字典。从那里,我能够用一个简单的循环构建一个字典列表。一旦我有了这个字典列表,数据就会退出 Spark 并进入纯粹的 Python 领域,并且需要 "easier" 来处理(但效率较低)。
或者,如果我选择 Scala 而不是 Python,我可能已经能够将数据框转换为数据集,它似乎提供了一些方法来执行我需要的操作,但那是完全是另一个故事。
我有一个 Spark 作业,可以将一些数据处理成几个单独的数据帧。我将这些数据帧存储在一个列表中,即数据帧[]。最终,我想将这些数据帧组合成分层格式并用 avro 编写输出。 avro 模式是这样的:
{
"name": "mydata",
"type": "record",
"fields": [
{"name": "data", "type": {
"type": "array", "items": {
"name": "actualData", "type": "record", "fields": [
{"name": "metadata1", "type": "int"},
{"name": "metadata2", "type": "string"},
{"name": "dataframe", "type": {
"type": "array", "items": {
"name": "dataframeRecord", "type": "record", "fields": [
{"name": "field1", "type": "int"},
{"name": "field2", "type": "int"},
{"name": "field3", "type": ["string", "null"]}]
}
}
}]
}
}
}
]
}
可以推断,每个数据帧都有三个字段,field1、field2和field3,我想将其作为数组写入avro文件中。还有一些与每个数据框关联的元数据。
我目前的做法是,一旦处理完这些数据,将数据帧写入S3,然后使用单独的程序从S3中拉取这些数据,使用avro库编写一个avro文件,然后上传它再次到S3。
但是,随着数据量的增长,这变得非常缓慢。我查看了 databricks 库以直接编写 avro 文件,但我不知道如何在内存中将数据帧组合在一起,或者 databricks 库如何确定我正在使用的模式。
在 Spark 中是否有惯用的方法来做到这一点?
P.S。我在 Python.
中将 EMR 与 Spark 2.0.0 结合使用如果模式相同,而您只想将所有记录放入同一个 DataFrame 中,则可以使用 DataFrame unionAll 方法。
http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll
此函数将获取一个数据帧并将其附加到另一个数据帧。问题是它假定列在两者之间的顺序相同,因此您可能需要做一些工作来让它们对齐并为任何缺失的列创建空列。这是我用来安全联合多个数据帧的 python 函数
def union_multiple_dataframes(iterable_list_df):
input_dfs = list(iterable_list_df)
# First figure out all the field names
field_types = {}
for df in input_dfs:
for field in df.schema.fields:
# Check for type mismatch
if field in field_types:
if field.dataType != field_types[field.name]:
raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
else:
field_types[field.name] = field.dataType
# First add in empty fields so all df's have the same schema
fields = set(field_types.keys())
for i, df in enumerate(input_dfs):
missing = fields - set(df.schema.names)
for field in missing:
df = df.withColumn(field, F.lit(None))
input_dfs[i] = df
# Finally put all the df's columns in the same order, and do the actual union
sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
return reduce(lambda x, y: x.unionAll(y), sorted_dfs)
示例用法如下:
input_dfs = [do_something(..) for x in y]
combined_df = union_multiple_dataframes(input_dfs)
combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
我找到了一个特定于 PySpark 的解决方案:
对于每个数据框,我使用 .collect() 来获取行列表。对于每个 Row 对象,我调用 asDict() 来获取字典。从那里,我能够用一个简单的循环构建一个字典列表。一旦我有了这个字典列表,数据就会退出 Spark 并进入纯粹的 Python 领域,并且需要 "easier" 来处理(但效率较低)。
或者,如果我选择 Scala 而不是 Python,我可能已经能够将数据框转换为数据集,它似乎提供了一些方法来执行我需要的操作,但那是完全是另一个故事。