Pyspark:使用新模式读取 ORC 文件
Pyspark: Read ORC files with new schema
我最初使用以下架构将 csv 数据转换为 orc 格式。这是每天发生的拉动。
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", IntegerType())])
大约一个月后,我 运行 遇到其中一列的一些问题,需要将类型更改为字符串,如下所示:
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", StringType())])
现在,如果我读入整个数据集并尝试显示:
alloc = spark.read.orc(f"tables/orc/alloc/")
alloc.select('RetailUnitSysCode').show()
我得到一个空指针异常:
An error occurred while calling o2302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 168.0 failed 4 times, most recent failure: Lost task 0.3 in stage 168.0 (TID 29923, ip-172-31-45-122.ec2.internal, executor 565): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
有没有办法使用新模式读取 orc 数据并填充空值,从而使数据集可用?现在,如果我检查我的数据框上的数据类型,我有 ('RetailUnitSysCode', 'int')
如果您事先知道架构,为什么不读取数据框中的 ORC 文件并使用简单的 select
函数调用进行转换。
您可以尝试这样的操作:
import org.apache.spark.sql.functions.col
alloc = spark.read.orc(f"tables/orc/alloc/")
myschemaDef = [("RetailUnit", "string"),"RetailUnitSysCode", "integer")]
columnExprs = [col(elem[0]).as(elem[1]) for elem in myschemaDef]
transformed_df = alloc.select(*columnExprs)
这应该会更改 Dataframe 中的数据类型。当您将其写回持久存储(如 HDFS、Hive 等)时,应维护这些。
注意:*columnExprs 中的 "*" 允许我们解压缩列表。并且是一个常见的 python 列表解包功能以传递多个参数。
我最初使用以下架构将 csv 数据转换为 orc 格式。这是每天发生的拉动。
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", IntegerType())])
大约一个月后,我 运行 遇到其中一列的一些问题,需要将类型更改为字符串,如下所示:
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", StringType())])
现在,如果我读入整个数据集并尝试显示:
alloc = spark.read.orc(f"tables/orc/alloc/")
alloc.select('RetailUnitSysCode').show()
我得到一个空指针异常:
An error occurred while calling o2302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 168.0 failed 4 times, most recent failure: Lost task 0.3 in stage 168.0 (TID 29923, ip-172-31-45-122.ec2.internal, executor 565): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
有没有办法使用新模式读取 orc 数据并填充空值,从而使数据集可用?现在,如果我检查我的数据框上的数据类型,我有 ('RetailUnitSysCode', 'int')
如果您事先知道架构,为什么不读取数据框中的 ORC 文件并使用简单的 select
函数调用进行转换。
您可以尝试这样的操作:
import org.apache.spark.sql.functions.col
alloc = spark.read.orc(f"tables/orc/alloc/")
myschemaDef = [("RetailUnit", "string"),"RetailUnitSysCode", "integer")]
columnExprs = [col(elem[0]).as(elem[1]) for elem in myschemaDef]
transformed_df = alloc.select(*columnExprs)
这应该会更改 Dataframe 中的数据类型。当您将其写回持久存储(如 HDFS、Hive 等)时,应维护这些。
注意:*columnExprs 中的 "*" 允许我们解压缩列表。并且是一个常见的 python 列表解包功能以传递多个参数。