无法使用 PySpark 读取 CSV 字符串
Can't read CSV string using PySpark
场景是: EventHub -> Azure Databricks(使用pyspark)
文件格式: CSV(带引号、竖线分隔和自定义架构)
我正在尝试读取来自 eventhub 的 CSV 字符串。 Spark 成功创建了具有正确模式的数据框,但数据框在每条消息后最终为空。
我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切顺利,但当数据来自字符串时却失败了。
所以我找到了一些链接来帮助我解决这个问题,但是 none 有效:
现在我有以下代码:
schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)
df.show()
CSV 文件也可以做到这一点吗?
您可以通过 Row
和 split
在 |
分隔符
上构造这样的模式
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
x[1],\
x[2],\
x[3],\
x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])
for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')
schemaDF.printSchema()
root
|-- Decisao: string (nullable = true)
|-- PedidoID: string (nullable = true)
|-- De_LastUpdated: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)
场景是: EventHub -> Azure Databricks(使用pyspark)
文件格式: CSV(带引号、竖线分隔和自定义架构)
我正在尝试读取来自 eventhub 的 CSV 字符串。 Spark 成功创建了具有正确模式的数据框,但数据框在每条消息后最终为空。
我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切顺利,但当数据来自字符串时却失败了。
所以我找到了一些链接来帮助我解决这个问题,但是 none 有效:
现在我有以下代码:
schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)
df.show()
CSV 文件也可以做到这一点吗?
您可以通过 Row
和 split
在 |
分隔符
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
x[1],\
x[2],\
x[3],\
x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])
for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')
schemaDF.printSchema()
root
|-- Decisao: string (nullable = true)
|-- PedidoID: string (nullable = true)
|-- De_LastUpdated: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)