pyspark 使用用户指定的模式读取 csv - 返回所有 StringType

pyspark read csv with user specified schema - returned all StringType

pyspark 的新手。我正在尝试使用具有用户指定模式结构类型的 pyspark 从 datalake blob 读取 csv 文件。下面是我试过的代码。

from pyspark.sql.types import *

customschema = StructType([
StructField("A", StringType(), True)
,StructField("B", DoubleType(), True)
,StructField("C", TimestampType(), True)
])

df_1 = spark.read.format("csv").options(header="true", schema=customschema, multiline="true", enforceSchema='true').load(destinationPath)

df_1.show()

输出:

+---------+------+--------------------+
|        A|     B|                   C|
+---------+------+--------------------+
|322849691|9547.0|2020-09-24 07:30:...|
|322847371| 492.0|2020-09-23 13:15:...|
|322329853|6661.0|2020-09-07 09:45:...|
|322283810| 500.0|2020-09-04 13:12:...|
|322319107| 251.0|2020-09-02 13:51:...|
|322319096| 254.0|2020-09-02 13:51:...|
+---------+------+--------------------+

但我得到的字段类型是字符串。我不太确定我做错了什么。

df_1.printSchema()

输出:

root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)

它适用于以下语法

customschema=StructType([
    StructField("A",StringType(), True),
    StructField("B",DoubleType(), True),
    StructField("C",TimestampType(), True)
])

df = spark.read.csv("test.csv", header=True, sep=";", schema=customschema)

df.show()
df.printSchema()

或者您也可以使用

df = spark.read.load("test.csv",format="csv", sep=";", schema=customschema, header="true")

有趣的是,read().option().load() 语法对我也不起作用。我不确定它是否有效。至少根据文档 .options() 仅用于 write(),因此导出数据框。

另一种选择是之后转换数据类型

import pyspark.sql.functions as f

df=(df
      .withColumn("B",f.col("B").cast("string"))
      .withColumn("B",f.col("B").cast("double"))
      .withColumn("C",f.col("C").cast("timestamp"))
     )

当您使用 DataFrameReader load 方法时,您应该使用 schema 而不是在选项中传递架构:

df_1 = spark.read.format("csv") \
    .options(header="true", multiline="true")\
    .schema(customschema).load(destinationPath)

这与接受 schema 作为参数的 API 方法 spark.read.csv 不同:

df_1 = spark.read.csv(destinationPath, schema=customschema, header=True)