"log and skip" 在 spark-streaming 中验证转换的正确方法是什么

What's the right way to "log and skip" validated transformations in spark-streaming

我有一个 spark-streaming 应用程序,我想在我的主要操作之前进行一些数据转换,但转换涉及一些数据验证。

当验证失败时,我想记录失败案例,然后继续其余的。

目前,我有这样的代码:

def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???

val validationResults = values.map(validate)

validationResults.foreachRDD { rdd =>
  rdd.foreach {
    case Left(error) => logger.error(error)
    case _           =>
  }
}

val validatedValues: DStream[MyCaseClass] =
  validationResults.mapPartitions { partition =>
    partition.collect { case Right(record) => record }
  }

这目前有效,但感觉我做错了什么。

问题

据我了解,这将执行 validation 两次,这可能是浪费。

我想说解决这个问题的最好方法是利用 stdlib flatMap 接受 Option

def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???

val validatedValues: DStream[MyCaseClass] =
  values.map(validate).flatMap {
    case Left(error) =>
      logger.error(error)
      None

    case Right(record) =>
      Some(record)
  }

你也可以使用 mapPartitions 更冗长一点,这应该会更有效一点。

此处的 'best' 选项在一定程度上取决于您的 spark 作业的其余部分和您的 spark 版本。

理想情况下,您会选择催化剂本身能够理解的机制。 spark3 数据集 observe 侦听器可能就是您要找的东西。 (我还没有看到很多在野外使用它的例子,但它 似乎 这就是这种事情背后的动机。)

在纯 spark sql 中,一种选择是添加一个包含验证结果的新列,例如名为 invalid_reason 的列如果记录有效则为 NULL 或包含列验证失败原因的某些 [枚举] 字符串。此时,您可能希望在执行 groupBy/count/collect/log 操作之前 persist/cache 数据集,然后在持久化数据帧上过滤 where invalid_reason is null 并继续进行其余处理。

tl;dr:考虑添加验证列而不是仅应用 'validate' 函数。然后您 'fork' 在此处处理:记录指定了无效列的记录,在没有指定的记录上处理您的其余工作。它确实会为您的数据框增加一些容量,但不需要两次处理相同的记录。