Spark SQL - 加载 csv/psv 文件,其中包含一些格式错误的记录
Spark SQL - loading csv/psv files with some malformed records
我们正在使用 Spark 加载文件目录的层次结构并将它们转换为 Parquet。数百个管道分隔的文件中有数十 GB。有些本身就很大。
例如,每个第 100 个文件都有一两行有一个额外的分隔符,使整个过程(或文件)中止。
我们正在加载使用:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.load(glob)
Spark 是否有任何扩展或事件处理机制,我们可以附加到读取行的逻辑,如果遇到格式错误的行,只是跳过该行而不是使处理失败?
(我们计划进行更多预处理,但这将是最直接和最关键的修复。)
在你的情况下,它可能不是它的 Spark 解析部分失败,而是默认值实际上是 PERMISSIVE
的事实,这样它将尽力而为地解析为格式错误的记录,然后导致问题处理逻辑的更下游。
您应该能够简单地添加选项:
.option("mode", "DROPMALFORMED")
像这样:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load(glob)
它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在稍后的代码中导致错误。
我们正在使用 Spark 加载文件目录的层次结构并将它们转换为 Parquet。数百个管道分隔的文件中有数十 GB。有些本身就很大。
例如,每个第 100 个文件都有一两行有一个额外的分隔符,使整个过程(或文件)中止。
我们正在加载使用:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.load(glob)
Spark 是否有任何扩展或事件处理机制,我们可以附加到读取行的逻辑,如果遇到格式错误的行,只是跳过该行而不是使处理失败?
(我们计划进行更多预处理,但这将是最直接和最关键的修复。)
在你的情况下,它可能不是它的 Spark 解析部分失败,而是默认值实际上是 PERMISSIVE
的事实,这样它将尽力而为地解析为格式错误的记录,然后导致问题处理逻辑的更下游。
您应该能够简单地添加选项:
.option("mode", "DROPMALFORMED")
像这样:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load(glob)
它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在稍后的代码中导致错误。