Pyspark - 合并多个 ORC 模式
Pyspark - Merge multiple ORC schemas
我有 2 个不同的目录,它们下面有一个 ORC 文件。这 2 个文件具有不同的架构。将两个目录读入同一个 DataFrame 后,最终模式取决于路径的顺序。
考虑使用以下代码来复制它:
data = [
(1, "player1", "google.com", True),
(2, "player1", "youtube.com", True),
(3, "player2", "facebook.com", True),
(4, "player2", "record.pt", True),
(5, "player2", "yahoo.com", True),
(6, "player3", "facebook.com", False),
(7, "player3", "record.pt", True),
(8, "player3", "yahoo.com", True),
(9, "player4", "", True),
(10, "player4", "record.pt", True),
(11, "player4", "abola.pt", True),
(12, "player4", None, True)
]
data2 = [
(13, "player1", True),
(14, "player2", True),
(15, "player3", True),
(16, "player4", True),
(17, "player3", True),
(18, "player3", True),
]
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])
df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')
df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])
df.show()
这会产生输出:
+---+-------+-----+---+
| id|splayer| bool|bla|
+---+-------+-----+---+
| 1|player1| true| 1|
| 2|player1| true| 1|
| 3|player2| true| 1|
| 4|player2| true| 1|
| 5|player2| true| 1|
| 6|player3|false| 1|
| 7|player3| true| 1|
| 8|player3| true| 1|
| 9|player4| true| 1|
| 10|player4| true| 1|
| 11|player4| true| 1|
| 12|player4| true| 1|
| 13|player1| true| 2|
| 14|player2| true| 2|
| 15|player3| true| 2|
| 16|player4| true| 2|
| 17|player3| true| 2|
| 18|player3| true| 2|
+---+-------+-----+---+
如果我更改目录的顺序,将生成以下输出:
+---+-------+------------+-----+---+
| id|splayer| website| bool|bla|
+---+-------+------------+-----+---+
| 1|player1| google.com| true| 1|
| 2|player1| youtube.com| true| 1|
| 3|player2|facebook.com| true| 1|
| 4|player2| record.pt| true| 1|
| 5|player2| yahoo.com| true| 1|
| 6|player3|facebook.com|false| 1|
| 7|player3| record.pt| true| 1|
| 8|player3| yahoo.com| true| 1|
| 9|player4| | true| 1|
| 10|player4| record.pt| true| 1|
| 11|player4| abola.pt| true| 1|
| 12|player4| null| true| 1|
| 13|player1| null| true| 2|
| 14|player2| null| true| 2|
| 15|player3| null| true| 2|
| 16|player4| null| true| 2|
| 17|player3| null| true| 2|
| 18|player3| null| true| 2|
+---+-------+------------+-----+---+
当我研究这个问题时,我发现有几个帖子说 option("mergeSchema", "true")
是一个解决方案。事实上,为此有一个 pull request。
是否有解决方案或者它仍然是一个悬而未决的问题?
我正在使用 (Py)Spark 2.4.3 和 Python 3.6.8。
提前致谢!
更新:
上述PR仅适用于Spark 3.0.0。感谢@Shaido 提供的信息。
由于某些供应商数据的架构演变,我遇到了同样的问题。
我一直在尝试一些不同的想法,因为 ORC mergeSchema 选项在 Spark 3.0 之前不可用,我们是 运行 2.3
我的第一个想法是用我的完整模式创建一个空数据框,包括任何新列,并将其作为 ORC 文件保存到按字母顺序排在第一位的目录中。例如,如果我的数据按 load_date 分区,那么我将拥有 load_date=00000000、load_date=20200501、load_date=20200601 等文件夹。
然后我会将具有完整模式的空数据框放在 00000000 分区中。
这行得通,但并不是那么干净,我不确定是否存在 ORC reader 不会以某种方式选择不同的 ORC 文件作为架构基础的边缘情况。
因此,我想到了为 ORC reader 提供一个包含我需要的所有列的模式,这很有效。
schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')
在 HDFS 的 orc_schema_evolution 文件夹中,我们有分区的 load_date 文件夹,其中一些 ORC 文件具有架构('state'、'population'),而其他文件具有架构的 ('state','population','abbr')。请注意,我什至可以使用此方法重新排列现有列的顺序。
我有 2 个不同的目录,它们下面有一个 ORC 文件。这 2 个文件具有不同的架构。将两个目录读入同一个 DataFrame 后,最终模式取决于路径的顺序。
考虑使用以下代码来复制它:
data = [
(1, "player1", "google.com", True),
(2, "player1", "youtube.com", True),
(3, "player2", "facebook.com", True),
(4, "player2", "record.pt", True),
(5, "player2", "yahoo.com", True),
(6, "player3", "facebook.com", False),
(7, "player3", "record.pt", True),
(8, "player3", "yahoo.com", True),
(9, "player4", "", True),
(10, "player4", "record.pt", True),
(11, "player4", "abola.pt", True),
(12, "player4", None, True)
]
data2 = [
(13, "player1", True),
(14, "player2", True),
(15, "player3", True),
(16, "player4", True),
(17, "player3", True),
(18, "player3", True),
]
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])
df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')
df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])
df.show()
这会产生输出:
+---+-------+-----+---+
| id|splayer| bool|bla|
+---+-------+-----+---+
| 1|player1| true| 1|
| 2|player1| true| 1|
| 3|player2| true| 1|
| 4|player2| true| 1|
| 5|player2| true| 1|
| 6|player3|false| 1|
| 7|player3| true| 1|
| 8|player3| true| 1|
| 9|player4| true| 1|
| 10|player4| true| 1|
| 11|player4| true| 1|
| 12|player4| true| 1|
| 13|player1| true| 2|
| 14|player2| true| 2|
| 15|player3| true| 2|
| 16|player4| true| 2|
| 17|player3| true| 2|
| 18|player3| true| 2|
+---+-------+-----+---+
如果我更改目录的顺序,将生成以下输出:
+---+-------+------------+-----+---+
| id|splayer| website| bool|bla|
+---+-------+------------+-----+---+
| 1|player1| google.com| true| 1|
| 2|player1| youtube.com| true| 1|
| 3|player2|facebook.com| true| 1|
| 4|player2| record.pt| true| 1|
| 5|player2| yahoo.com| true| 1|
| 6|player3|facebook.com|false| 1|
| 7|player3| record.pt| true| 1|
| 8|player3| yahoo.com| true| 1|
| 9|player4| | true| 1|
| 10|player4| record.pt| true| 1|
| 11|player4| abola.pt| true| 1|
| 12|player4| null| true| 1|
| 13|player1| null| true| 2|
| 14|player2| null| true| 2|
| 15|player3| null| true| 2|
| 16|player4| null| true| 2|
| 17|player3| null| true| 2|
| 18|player3| null| true| 2|
+---+-------+------------+-----+---+
当我研究这个问题时,我发现有几个帖子说 option("mergeSchema", "true")
是一个解决方案。事实上,为此有一个 pull request。
是否有解决方案或者它仍然是一个悬而未决的问题?
我正在使用 (Py)Spark 2.4.3 和 Python 3.6.8。
提前致谢!
更新:
上述PR仅适用于Spark 3.0.0。感谢@Shaido 提供的信息。
由于某些供应商数据的架构演变,我遇到了同样的问题。 我一直在尝试一些不同的想法,因为 ORC mergeSchema 选项在 Spark 3.0 之前不可用,我们是 运行 2.3 我的第一个想法是用我的完整模式创建一个空数据框,包括任何新列,并将其作为 ORC 文件保存到按字母顺序排在第一位的目录中。例如,如果我的数据按 load_date 分区,那么我将拥有 load_date=00000000、load_date=20200501、load_date=20200601 等文件夹。 然后我会将具有完整模式的空数据框放在 00000000 分区中。 这行得通,但并不是那么干净,我不确定是否存在 ORC reader 不会以某种方式选择不同的 ORC 文件作为架构基础的边缘情况。 因此,我想到了为 ORC reader 提供一个包含我需要的所有列的模式,这很有效。
schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')
在 HDFS 的 orc_schema_evolution 文件夹中,我们有分区的 load_date 文件夹,其中一些 ORC 文件具有架构('state'、'population'),而其他文件具有架构的 ('state','population','abbr')。请注意,我什至可以使用此方法重新排列现有列的顺序。