Pyspark:使用新模式读取 ORC 文件

Pyspark: Read ORC files with new schema

我最初使用以下架构将 csv 数据转换为 orc 格式。这是每天发生的拉动。

MySchema = StructType([
    StructField("RetailUnit", StringType()),
    StructField("RetailUnitSysCode", IntegerType())])

大约一个月后,我 运行 遇到其中一列的一些问题,需要将类型更改为字符串,如下所示:

MySchema = StructType([
    StructField("RetailUnit", StringType()),
    StructField("RetailUnitSysCode", StringType())])

现在,如果我读入整个数据集并尝试显示:

alloc = spark.read.orc(f"tables/orc/alloc/")

alloc.select('RetailUnitSysCode').show()

我得到一个空指针异常:

An error occurred while calling o2302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 168.0 failed 4 times, most recent failure: Lost task 0.3 in stage 168.0 (TID 29923, ip-172-31-45-122.ec2.internal, executor 565): java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

有没有办法使用新模式读取 orc 数据并填充空值,从而使数据集可用?现在,如果我检查我的数据框上的数据类型,我有 ('RetailUnitSysCode', 'int')

如果您事先知道架构,为什么不读取数据框中的 ORC 文件并使用简单的 select 函数调用进行转换。

您可以尝试这样的操作:

import org.apache.spark.sql.functions.col

alloc = spark.read.orc(f"tables/orc/alloc/")

myschemaDef = [("RetailUnit", "string"),"RetailUnitSysCode", "integer")]

columnExprs = [col(elem[0]).as(elem[1]) for elem in myschemaDef]

transformed_df = alloc.select(*columnExprs)

这应该会更改 Dataframe 中的数据类型。当您将其写回持久存储(如 HDFS、Hive 等)时,应维护这些。

注意:*columnExprs 中的 "*" 允许我们解压缩列表。并且是一个常见的 python 列表解包功能以传递多个参数。