Spark SQL - 加载 csv/psv 文件,其中包含一些格式错误的记录

Spark SQL - loading csv/psv files with some malformed records

我们正在使用 Spark 加载文件目录的层次结构并将它们转换为 Parquet。数百个管道分隔的文件中有数十 GB。有些本身就很大。

例如,每个第 100 个文件都有一两行有一个额外的分隔符,使整个过程(或文件)中止。

我们正在加载使用:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .load(glob)

Spark 是否有任何扩展或事件处理机制,我们可以附加到读取行的逻辑,如果遇到格式错误的行,只是跳过该行而不是使处理失败?

(我们计划进行更多预处理,但这将是最直接和最关键的修复。)

在你的情况下,它可能不是它的 Spark 解析部分失败,而是默认值实际上是 PERMISSIVE 的事实,这样它将尽力而为地解析为格式错误的记录,然后导致问题处理逻辑的更下游。

您应该能够简单地添加选项:

.option("mode", "DROPMALFORMED")

像这样:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .option("mode", "DROPMALFORMED")
        .load(glob)

它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在稍后的代码中导致错误。