可以将与 header 记录长度不同的记录放入 bad_record 目录

Possible to put records that aren't same length as header records to bad_record directory

我正在将文件读入这样的数据框中

val df = spark.read
   .option("sep", props.inputSeperator)
   .option("header", "true")
   .option("badRecordsPath", "/mnt/adls/udf_databricks/error")
   .csv(inputLoc)

文件是这样设置的

col_a|col_b|col_c|col_d
1|first|last|
2|this|is|data
3|ok
4|more||stuff
5|||

现在,spark 会将所有这些读取为可接受的数据。但是,我希望 3|ok 被标记为错误记录,因为它的大小与 header 大小不匹配。这可能吗?

val a = spark.sparkContext.textFile(pathOfYourFile)
val size = a.first.split("\|").length
a.filter(i => i.split("\|",-1).size != size).saveAsTextFile("/mnt/adls/udf_databricks/error")

以下代码由 spark.I 的数据块实现支持,在您的代码中看不到模式映射。你能映射它并尝试吗?

.option("badRecordsPath", "/mnt/adls/udf_databricks/error")

像下面这样更改您的代码,

val customSchema = StructType(Array(
    StructField("col_a", StringType, true),
    StructField("col_b", StringType, true),
    StructField("col_c", StringType, true),
    StructField("col_d", StringType, true)))

val df = spark.read
   .option("sep", props.inputSeperator)
   .option("header", "true")
   .option("badRecordsPath", "/mnt/adls/udf_databricks/error")
   .schema(customSchema)
   .csv(inputLoc)

更详细的可以参考Datbricks doc on badrecordspath

谢谢, 卡尔西克