如果架构不正确,是否可以阻止附加文件?
is it possible to prevent appending a file if the schema is not correct?
以下示例表明即使要追加的数据多了一列(不同模式),spark 也允许追加文件。有没有办法防止这种情况发生?原则上,镶木地板文件包含架构,那么是否有自动实现此功能的方法?
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import os
# imports for random dataset creation
import random
import string
spark = SparkSession.builder.appName('learn').master('yarn').enableHiveSupport().getOrCreate()
# first save
schema1 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
])
data1 = [[random.randint(0, 5), ''.join(random.choice(string.ascii_lowercase) for _ in range(10))] for _ in range(10)]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.write.format('parquet').mode(saveMode='overwrite').save('/tmp/df1_2')
# df1_2.show()
# +---------+-----------+
# |id_inside|name_inside|
# +---------+-----------+
# | 0| krohfjcwwo|
# | 0| cvkwmuddxf|
# | 5| rsxtjdfwjv|
os.system('hadoop fs -ls /tmp/df1_2')
# os.system('hadoop fs -ls /tmp/df1_2')
# Found 3 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# append data using a different schema
schema2 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
StructField('name_inside2', StringType(), nullable=False),
])
data2 = [[random.randint(0, 5), *[''.join(random.choice(string.ascii_lowercase) for _ in range(10))]*2 ] for _ in range(10)]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.write.format('parquet').mode(saveMode='append').save('/tmp/df1_2')
os.system('hadoop fs -ls /tmp/df1_2')
# Found 5 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 1006 2021-07-30 17:58 /tmp/df1_2/part-00000-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 1003 2021-07-30 17:58 /tmp/df1_2/part-00001-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# read the file back in and check the schema
res = spark.read.format('parquet').load('/tmp/df1_2')
res.sample(withReplacement=False, fraction=0.1).show()
# +---------+-----------+------------+
# |id_inside|name_inside|name_inside2|
# +---------+-----------+------------+
# | 5| gmafmuprti| gmafmuprti|
# | 3| ttshihunbe| ttshihunbe|
# | 2| dlrpqnzwrz| null|
# +---------+-----------+------------+
res.printSchema()
# root
# |-- id_inside: long (nullable = true)
# |-- name_inside: string (nullable = true)
# |-- name_inside2: string (nullable = true)
没有。它违背了模式演化的整个概念。您需要使用 if 逻辑将当前架构与新架构进行比较。
以下示例表明即使要追加的数据多了一列(不同模式),spark 也允许追加文件。有没有办法防止这种情况发生?原则上,镶木地板文件包含架构,那么是否有自动实现此功能的方法?
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import os
# imports for random dataset creation
import random
import string
spark = SparkSession.builder.appName('learn').master('yarn').enableHiveSupport().getOrCreate()
# first save
schema1 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
])
data1 = [[random.randint(0, 5), ''.join(random.choice(string.ascii_lowercase) for _ in range(10))] for _ in range(10)]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.write.format('parquet').mode(saveMode='overwrite').save('/tmp/df1_2')
# df1_2.show()
# +---------+-----------+
# |id_inside|name_inside|
# +---------+-----------+
# | 0| krohfjcwwo|
# | 0| cvkwmuddxf|
# | 5| rsxtjdfwjv|
os.system('hadoop fs -ls /tmp/df1_2')
# os.system('hadoop fs -ls /tmp/df1_2')
# Found 3 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# append data using a different schema
schema2 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
StructField('name_inside2', StringType(), nullable=False),
])
data2 = [[random.randint(0, 5), *[''.join(random.choice(string.ascii_lowercase) for _ in range(10))]*2 ] for _ in range(10)]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.write.format('parquet').mode(saveMode='append').save('/tmp/df1_2')
os.system('hadoop fs -ls /tmp/df1_2')
# Found 5 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 1006 2021-07-30 17:58 /tmp/df1_2/part-00000-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 1003 2021-07-30 17:58 /tmp/df1_2/part-00001-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# read the file back in and check the schema
res = spark.read.format('parquet').load('/tmp/df1_2')
res.sample(withReplacement=False, fraction=0.1).show()
# +---------+-----------+------------+
# |id_inside|name_inside|name_inside2|
# +---------+-----------+------------+
# | 5| gmafmuprti| gmafmuprti|
# | 3| ttshihunbe| ttshihunbe|
# | 2| dlrpqnzwrz| null|
# +---------+-----------+------------+
res.printSchema()
# root
# |-- id_inside: long (nullable = true)
# |-- name_inside: string (nullable = true)
# |-- name_inside2: string (nullable = true)
没有。它违背了模式演化的整个概念。您需要使用 if 逻辑将当前架构与新架构进行比较。