可以将与 header 记录长度不同的记录放入 bad_record 目录
Possible to put records that aren't same length as header records to bad_record directory
我正在将文件读入这样的数据框中
val df = spark.read
.option("sep", props.inputSeperator)
.option("header", "true")
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
.csv(inputLoc)
文件是这样设置的
col_a|col_b|col_c|col_d
1|first|last|
2|this|is|data
3|ok
4|more||stuff
5|||
现在,spark 会将所有这些读取为可接受的数据。但是,我希望 3|ok
被标记为错误记录,因为它的大小与 header 大小不匹配。这可能吗?
val a = spark.sparkContext.textFile(pathOfYourFile)
val size = a.first.split("\|").length
a.filter(i => i.split("\|",-1).size != size).saveAsTextFile("/mnt/adls/udf_databricks/error")
以下代码由 spark.I 的数据块实现支持,在您的代码中看不到模式映射。你能映射它并尝试吗?
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
像下面这样更改您的代码,
val customSchema = StructType(Array(
StructField("col_a", StringType, true),
StructField("col_b", StringType, true),
StructField("col_c", StringType, true),
StructField("col_d", StringType, true)))
val df = spark.read
.option("sep", props.inputSeperator)
.option("header", "true")
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
.schema(customSchema)
.csv(inputLoc)
更详细的可以参考Datbricks doc on badrecordspath
谢谢,
卡尔西克
我正在将文件读入这样的数据框中
val df = spark.read
.option("sep", props.inputSeperator)
.option("header", "true")
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
.csv(inputLoc)
文件是这样设置的
col_a|col_b|col_c|col_d
1|first|last|
2|this|is|data
3|ok
4|more||stuff
5|||
现在,spark 会将所有这些读取为可接受的数据。但是,我希望 3|ok
被标记为错误记录,因为它的大小与 header 大小不匹配。这可能吗?
val a = spark.sparkContext.textFile(pathOfYourFile)
val size = a.first.split("\|").length
a.filter(i => i.split("\|",-1).size != size).saveAsTextFile("/mnt/adls/udf_databricks/error")
以下代码由 spark.I 的数据块实现支持,在您的代码中看不到模式映射。你能映射它并尝试吗?
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
像下面这样更改您的代码,
val customSchema = StructType(Array(
StructField("col_a", StringType, true),
StructField("col_b", StringType, true),
StructField("col_c", StringType, true),
StructField("col_d", StringType, true)))
val df = spark.read
.option("sep", props.inputSeperator)
.option("header", "true")
.option("badRecordsPath", "/mnt/adls/udf_databricks/error")
.schema(customSchema)
.csv(inputLoc)
更详细的可以参考Datbricks doc on badrecordspath
谢谢, 卡尔西克