如何在加载整个数据集之前将自定义数据 formatting/map 应用于每个事件?

How to apply custom data formatting/map to each event before loading the entire dataset?

documentation 将数据流读入 Apache Spark 的标准方法是:

events = spark.readStream \
  .format("json") \           # or parquet, kafka, orc...
  .option() \                 # format specific options
  .schema(my_schema) \        # required
  .load("path/to/data")

但我需要清理一些数据,在应用模式之前重新排列一些字段,我希望会有一个

events = spark.readStream \
  .format("json") \           # or parquet, kafka, orc...
  .option() \                 # format specific options
  .schema(my_schema) \        # required
  **.map(custom_function)**   # apply a custom function to the json object
  .load("path/to/data")

在 Apache Spark 中使用结构化流是否有一种有效的方法来做到这一点?

tl;dr 简而言之,您不能在加载数据集之前执行此操作。

我想到的唯一方法是将您的数据集作为一组字符串加载并使用一系列 withColumnselect 转换来清理它们,实际上是您的 .map(custom_function).

同意 Jacek 的回答。更具体地说,您有两个选择:

  1. 应用输入数据的 "super-schema",然后对您想要的模式进行操作。当 (a) 所有数据都有效 JSON 并且 (b) "super-schema" 比较稳定时,这是最好的方法,例如,不存在动态字段名称。

  2. 作为文本读取,使用 json4s(或您选择的其他库)进行解析,根据需要进行操作。这是最好的方法,如果 (a) 任何输入行可能无效 JSON 或 (b) 没有稳定的 "super-schema".