pyspark 使用用户指定的模式读取 csv - 返回所有 StringType
pyspark read csv with user specified schema - returned all StringType
pyspark 的新手。我正在尝试使用具有用户指定模式结构类型的 pyspark 从 datalake blob 读取 csv 文件。下面是我试过的代码。
from pyspark.sql.types import *
customschema = StructType([
StructField("A", StringType(), True)
,StructField("B", DoubleType(), True)
,StructField("C", TimestampType(), True)
])
df_1 = spark.read.format("csv").options(header="true", schema=customschema, multiline="true", enforceSchema='true').load(destinationPath)
df_1.show()
输出:
+---------+------+--------------------+
| A| B| C|
+---------+------+--------------------+
|322849691|9547.0|2020-09-24 07:30:...|
|322847371| 492.0|2020-09-23 13:15:...|
|322329853|6661.0|2020-09-07 09:45:...|
|322283810| 500.0|2020-09-04 13:12:...|
|322319107| 251.0|2020-09-02 13:51:...|
|322319096| 254.0|2020-09-02 13:51:...|
+---------+------+--------------------+
但我得到的字段类型是字符串。我不太确定我做错了什么。
df_1.printSchema()
输出:
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: string (nullable = true)
它适用于以下语法
customschema=StructType([
StructField("A",StringType(), True),
StructField("B",DoubleType(), True),
StructField("C",TimestampType(), True)
])
df = spark.read.csv("test.csv", header=True, sep=";", schema=customschema)
df.show()
df.printSchema()
或者您也可以使用
df = spark.read.load("test.csv",format="csv", sep=";", schema=customschema, header="true")
有趣的是,read().option().load() 语法对我也不起作用。我不确定它是否有效。至少根据文档 .options() 仅用于 write(),因此导出数据框。
另一种选择是之后转换数据类型
import pyspark.sql.functions as f
df=(df
.withColumn("B",f.col("B").cast("string"))
.withColumn("B",f.col("B").cast("double"))
.withColumn("C",f.col("C").cast("timestamp"))
)
当您使用 DataFrameReader load
方法时,您应该使用 schema
而不是在选项中传递架构:
df_1 = spark.read.format("csv") \
.options(header="true", multiline="true")\
.schema(customschema).load(destinationPath)
这与接受 schema
作为参数的 API 方法 spark.read.csv
不同:
df_1 = spark.read.csv(destinationPath, schema=customschema, header=True)
pyspark 的新手。我正在尝试使用具有用户指定模式结构类型的 pyspark 从 datalake blob 读取 csv 文件。下面是我试过的代码。
from pyspark.sql.types import *
customschema = StructType([
StructField("A", StringType(), True)
,StructField("B", DoubleType(), True)
,StructField("C", TimestampType(), True)
])
df_1 = spark.read.format("csv").options(header="true", schema=customschema, multiline="true", enforceSchema='true').load(destinationPath)
df_1.show()
输出:
+---------+------+--------------------+
| A| B| C|
+---------+------+--------------------+
|322849691|9547.0|2020-09-24 07:30:...|
|322847371| 492.0|2020-09-23 13:15:...|
|322329853|6661.0|2020-09-07 09:45:...|
|322283810| 500.0|2020-09-04 13:12:...|
|322319107| 251.0|2020-09-02 13:51:...|
|322319096| 254.0|2020-09-02 13:51:...|
+---------+------+--------------------+
但我得到的字段类型是字符串。我不太确定我做错了什么。
df_1.printSchema()
输出:
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: string (nullable = true)
它适用于以下语法
customschema=StructType([
StructField("A",StringType(), True),
StructField("B",DoubleType(), True),
StructField("C",TimestampType(), True)
])
df = spark.read.csv("test.csv", header=True, sep=";", schema=customschema)
df.show()
df.printSchema()
或者您也可以使用
df = spark.read.load("test.csv",format="csv", sep=";", schema=customschema, header="true")
有趣的是,read().option().load() 语法对我也不起作用。我不确定它是否有效。至少根据文档 .options() 仅用于 write(),因此导出数据框。
另一种选择是之后转换数据类型
import pyspark.sql.functions as f
df=(df
.withColumn("B",f.col("B").cast("string"))
.withColumn("B",f.col("B").cast("double"))
.withColumn("C",f.col("C").cast("timestamp"))
)
当您使用 DataFrameReader load
方法时,您应该使用 schema
而不是在选项中传递架构:
df_1 = spark.read.format("csv") \
.options(header="true", multiline="true")\
.schema(customschema).load(destinationPath)
这与接受 schema
作为参数的 API 方法 spark.read.csv
不同:
df_1 = spark.read.csv(destinationPath, schema=customschema, header=True)