名称中具有模式的大量列的 Apache Spark 错误记录

Apache Spark bad record for large number of columns having pattern in name

我是 Spark 的新手,正在尝试处理不良记录。我在搜索时发现了以下内容

val df = spark.read
          .option("badRecordsPath", "/tmp/badRecordsPath")
          .schema("Country String, Rank Integer")
          .csv("/tmp/inputFile.csv")

我们正在读取多个文件,需要验证架构。但这里的问题是,在每个输入 CSV 文件中,我们都有一些具有特定模式的列名,并且每个文件中的数字可以是动态的。 例如,在第一个 CSV 文件

中考虑以下 headers
ID, POL_1, POL_2, POL_3, . . .  . POL500

第二个 CSV 文件如下 headers

ID, POL_1, POL_2, POL_3, ...,POL_10

现在对于每个文件,我们有不同数量的 headers 并且具有相似的模式。因此,在那种情况下,我无法对模式字符串或结构字段进行硬编码。有什么办法可以达到同样的目的吗?

我能想到的一个解决方案是每次都预加载输入文件并提取模式,然后应用正则表达式并分配数据类型。但是在这种情况下,我们必须加载两次文件。

任何人都可以帮我解决这个问题,因为我一直在努力解决这个问题。

我有过类似的情况,但是,我的文件类型有限,不知道你是不是这样,我是怎么做的:

  1. 读取 CSV 文件的第一行。
  2. Case/Switch 在第一行选择正确的模式
  3. 摄取文件。
  4. 按名称联合所有数据框(我的目标是最后有一个大数据框)。

可以在此处找到类似的示例:https://github.com/jgperrin/net.jgp.books.spark.ch15/blob/master/src/main/java/net/jgp/books/spark/ch15/lab300_nyc_school_stats/NewYorkSchoolStatisticsApp.java

如果您的 header 是完全可变的,那么您可以根据读取 CSV 的第一行并从中推断架构来动态创建架构。它应该没有那么困难,但您需要确保您的选项对于您的架构构建器和实际的 read() 都是相同的。我会将其隔离在同时执行模式构建和摄取的实用程序函数中。

另一种选择是在没有模式的情况下加载所有内容,然后在 DQ 规范化过程中验证数据。这有点痛苦,但它也可以让你更好地控制你寻找的异常。