如何在使用 spark 读取 xml 时识别或重新路由坏的 xml

How to identify or reroute bad xml's when reading xmls with spark

使用 spark,我试图从路径中读取一堆 xmls,其中一个文件是一个虚拟文件,它不是 xml.

我希望 spark 以任何方式告诉我某个特定文件无效

添加 "badRecordsPath" otiton 将坏数据写入 JSON 文件的指定位置,但同样不适用于 xml,还有其他方法吗?

df = (spark.read.format('json')
      .option('badRecordsPath','/tmp/data/failed')
      .load('/tmp/data/dummy.json')

据我所知....不幸的是,它直到今天才以 声明方式 [= 在 xml spark 包中可用28=]...如您所愿...

Json 自从 FailureSafeParser 如下实施以来,它一直在工作......在 DataFrameReader

/**
   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
   *
   * Unless the schema is specified using `schema` function, this function goes through the
   * input once to determine the input schema.
   *
   * @param jsonDataset input Dataset with one JSON object per record
   * @since 2.2.0
   */
  def json(jsonDataset: Dataset[String]): DataFrame = {
    val parsedOptions = new JSONOptions(
      extraOptions.toMap,
      sparkSession.sessionState.conf.sessionLocalTimeZone,
      sparkSession.sessionState.conf.columnNameOfCorruptRecord)

    val schema = userSpecifiedSchema.getOrElse {
      TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
    }

    ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
    val actualSchema =
      StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

    val createParser = CreateJacksonParser.string _
    val parsed = jsonDataset.rdd.mapPartitions { iter =>
      val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
      val parser = new FailureSafeParser[String](
        input => rawParser.parse(input, createParser, UTF8String.fromString),
        parsedOptions.parseMode,
        schema,
        parsedOptions.columnNameOfCorruptRecord)
      iter.flatMap(parser.parse)
    }
    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
  }

您可以通过编程方式实现功能。
使用 sc.textFile 读取文件夹中的所有文件。 foreach 文件使用 xml 解析器解析条目。

如果它有效重定向到另一个路径。

如果无效,则写入坏记录路径。