"log and skip" 在 spark-streaming 中验证转换的正确方法是什么
What's the right way to "log and skip" validated transformations in spark-streaming
我有一个 spark-streaming 应用程序,我想在我的主要操作之前进行一些数据转换,但转换涉及一些数据验证。
当验证失败时,我想记录失败案例,然后继续其余的。
目前,我有这样的代码:
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validationResults = values.map(validate)
validationResults.foreachRDD { rdd =>
rdd.foreach {
case Left(error) => logger.error(error)
case _ =>
}
}
val validatedValues: DStream[MyCaseClass] =
validationResults.mapPartitions { partition =>
partition.collect { case Right(record) => record }
}
这目前有效,但感觉我做错了什么。
问题
据我了解,这将执行 validation
两次,这可能是浪费。
- 使用
values.map(validation).persist()
来解决这个问题是否正确?
- 即使我保留这些值,它仍然会在所有元素上进行两次迭代和模式匹配。感觉应该有一些方法可以用来解决这个问题。在常规的 Scala 集合中,我可能会使用一些猫
TraverseFilter
api,或者使用 fs2.Stream
和 evalMapFilter
。我可以为此使用什么 DStream api?也许 mapPartitions
?
我想说解决这个问题的最好方法是利用 stdlib flatMap
接受 Option
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validatedValues: DStream[MyCaseClass] =
values.map(validate).flatMap {
case Left(error) =>
logger.error(error)
None
case Right(record) =>
Some(record)
}
你也可以使用 mapPartitions
更冗长一点,这应该会更有效一点。
此处的 'best' 选项在一定程度上取决于您的 spark 作业的其余部分和您的 spark 版本。
理想情况下,您会选择催化剂本身能够理解的机制。 spark3 数据集 observe 侦听器可能就是您要找的东西。 (我还没有看到很多在野外使用它的例子,但它 似乎 这就是这种事情背后的动机。)
在纯 spark sql 中,一种选择是添加一个包含验证结果的新列,例如名为 invalid_reason
的列如果记录有效则为 NULL
或包含列验证失败原因的某些 [枚举] 字符串。此时,您可能希望在执行 groupBy/count/collect/log
操作之前 persist/cache 数据集,然后在持久化数据帧上过滤 where invalid_reason is null
并继续进行其余处理。
tl;dr:考虑添加验证列而不是仅应用 'validate' 函数。然后您 'fork' 在此处处理:记录指定了无效列的记录,在没有指定的记录上处理您的其余工作。它确实会为您的数据框增加一些容量,但不需要两次处理相同的记录。
我有一个 spark-streaming 应用程序,我想在我的主要操作之前进行一些数据转换,但转换涉及一些数据验证。
当验证失败时,我想记录失败案例,然后继续其余的。
目前,我有这样的代码:
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validationResults = values.map(validate)
validationResults.foreachRDD { rdd =>
rdd.foreach {
case Left(error) => logger.error(error)
case _ =>
}
}
val validatedValues: DStream[MyCaseClass] =
validationResults.mapPartitions { partition =>
partition.collect { case Right(record) => record }
}
这目前有效,但感觉我做错了什么。
问题
据我了解,这将执行 validation
两次,这可能是浪费。
- 使用
values.map(validation).persist()
来解决这个问题是否正确? - 即使我保留这些值,它仍然会在所有元素上进行两次迭代和模式匹配。感觉应该有一些方法可以用来解决这个问题。在常规的 Scala 集合中,我可能会使用一些猫
TraverseFilter
api,或者使用fs2.Stream
和evalMapFilter
。我可以为此使用什么 DStream api?也许mapPartitions
?
我想说解决这个问题的最好方法是利用 stdlib flatMap
接受 Option
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validatedValues: DStream[MyCaseClass] =
values.map(validate).flatMap {
case Left(error) =>
logger.error(error)
None
case Right(record) =>
Some(record)
}
你也可以使用 mapPartitions
更冗长一点,这应该会更有效一点。
此处的 'best' 选项在一定程度上取决于您的 spark 作业的其余部分和您的 spark 版本。
理想情况下,您会选择催化剂本身能够理解的机制。 spark3 数据集 observe 侦听器可能就是您要找的东西。 (我还没有看到很多在野外使用它的例子,但它 似乎 这就是这种事情背后的动机。)
在纯 spark sql 中,一种选择是添加一个包含验证结果的新列,例如名为 invalid_reason
的列如果记录有效则为 NULL
或包含列验证失败原因的某些 [枚举] 字符串。此时,您可能希望在执行 groupBy/count/collect/log
操作之前 persist/cache 数据集,然后在持久化数据帧上过滤 where invalid_reason is null
并继续进行其余处理。
tl;dr:考虑添加验证列而不是仅应用 'validate' 函数。然后您 'fork' 在此处处理:记录指定了无效列的记录,在没有指定的记录上处理您的其余工作。它确实会为您的数据框增加一些容量,但不需要两次处理相同的记录。