无法使用 PySpark 读取 CSV 字符串

Can't read CSV string using PySpark

场景是: EventHub -> Azure Databricks(使用pyspark)

文件格式: CSV(带引号、竖线分隔和自定义架构)

我正在尝试读取来自 eventhub 的 CSV 字符串。 Spark 成功创建了具有正确模式的数据框,但数据框在每条消息后最终为空。

我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切顺利,但当数据来自字符串时却失败了。

所以我找到了一些链接来帮助我解决这个问题,但是 none 有效:

现在我有以下代码:

schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])

df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)

df.show()

CSV 文件也可以做到这一点吗?

您可以通过 Rowsplit| 分隔符

上构造这样的模式
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
                   x[1],\
                   x[2],\
                   x[3],\
                   x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])

for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')

schemaDF.printSchema()
root
 |-- Decisao: string (nullable = true)
 |-- PedidoID: string (nullable = true)
 |-- De_LastUpdated: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)