Pyspark - 合并多个 ORC 模式

Pyspark - Merge multiple ORC schemas

我有 2 个不同的目录,它们下面有一个 ORC 文件。这 2 个文件具有不同的架构。将两个目录读入同一个 DataFrame 后,最终模式取决于路径的顺序。

考虑使用以下代码来复制它:

data = [
    (1, "player1", "google.com", True),
    (2, "player1", "youtube.com", True),
    (3, "player2", "facebook.com", True),
    (4, "player2", "record.pt", True),
    (5, "player2", "yahoo.com", True),
    (6, "player3", "facebook.com", False),
    (7, "player3", "record.pt", True),
    (8, "player3", "yahoo.com", True),
    (9, "player4", "", True),
    (10, "player4", "record.pt", True),
    (11, "player4", "abola.pt", True),
    (12, "player4", None, True)
]

data2 = [
    (13, "player1", True),
    (14, "player2", True),
    (15, "player3", True),
    (16, "player4", True),
    (17, "player3", True),
    (18, "player3", True),
]

spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])

df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')

df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])

df.show()

这会产生输出:

+---+-------+-----+---+                                                         
| id|splayer| bool|bla|
+---+-------+-----+---+
|  1|player1| true|  1|
|  2|player1| true|  1|
|  3|player2| true|  1|
|  4|player2| true|  1|
|  5|player2| true|  1|
|  6|player3|false|  1|
|  7|player3| true|  1|
|  8|player3| true|  1|
|  9|player4| true|  1|
| 10|player4| true|  1|
| 11|player4| true|  1|
| 12|player4| true|  1|
| 13|player1| true|  2|
| 14|player2| true|  2|
| 15|player3| true|  2|
| 16|player4| true|  2|
| 17|player3| true|  2|
| 18|player3| true|  2|
+---+-------+-----+---+

如果我更改目录的顺序,将生成以下输出:

+---+-------+------------+-----+---+                                            
| id|splayer|     website| bool|bla|
+---+-------+------------+-----+---+
|  1|player1|  google.com| true|  1|
|  2|player1| youtube.com| true|  1|
|  3|player2|facebook.com| true|  1|
|  4|player2|   record.pt| true|  1|
|  5|player2|   yahoo.com| true|  1|
|  6|player3|facebook.com|false|  1|
|  7|player3|   record.pt| true|  1|
|  8|player3|   yahoo.com| true|  1|
|  9|player4|            | true|  1|
| 10|player4|   record.pt| true|  1|
| 11|player4|    abola.pt| true|  1|
| 12|player4|        null| true|  1|
| 13|player1|        null| true|  2|
| 14|player2|        null| true|  2|
| 15|player3|        null| true|  2|
| 16|player4|        null| true|  2|
| 17|player3|        null| true|  2|
| 18|player3|        null| true|  2|
+---+-------+------------+-----+---+

当我研究这个问题时,我发现有几个帖子说 option("mergeSchema", "true") 是一个解决方案。事实上,为此有一个 pull request

是否有解决方案或者它仍然是一个悬而未决的问题?

我正在使用 (Py)Spark 2.4.3 和 Python 3.6.8。

提前致谢!

更新:

上述PR仅适用于Spark 3.0.0。感谢@Shaido 提供的信息。

由于某些供应商数据的架构演变,我遇到了同样的问题。 我一直在尝试一些不同的想法,因为 ORC mergeSchema 选项在 Spark 3.0 之前不可用,我们是 运行 2.3 我的第一个想法是用我的完整模式创建一个空数据框,包括任何新列,并将其作为 ORC 文件保存到按字母顺序排在第一位的目录中。例如,如果我的数据按 load_date 分区,那么我将拥有 load_date=00000000、load_date=20200501、load_date=20200601 等文件夹。 然后我会将具有完整模式的空数据框放在 00000000 分区中。 这行得通,但并不是那么干净,我不确定是否存在 ORC reader 不会以某种方式选择不同的 ORC 文件作为架构基础的边缘情况。 因此,我想到了为 ORC reader 提供一个包含我需要的所有列的模式,这很有效。

schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')

在 HDFS 的 orc_schema_evolution 文件夹中,我们有分区的 load_date 文件夹,其中一些 ORC 文件具有架构('state'、'population'),而其他文件具有架构的 ('state','population','abbr')。请注意,我什至可以使用此方法重新排列现有列的顺序。