Spark:如何在映射转换中构造一系列副作用操作以避免重复?

Spark: How to structure a series of side effect actions inside mapping transformation to avoid repetition?

我有一个 Spark Streaming 应用程序需要执行以下步骤:

  1. 获取一个字符串,对其应用一些映射转换
  2. 再次映射:如果这个字符串(现在是一个数组)中有特定值,立即发送电子邮件(或在 spark 环境之外做一些事情)
  3. 收集()并保存在特定目录中
  4. 应用一些其他的 transformation/enrichment
  5. 收集()并保存在另一个目录中。

如您所见,这意味着延迟激活的计算会执行两次 OUTSIDE 操作。我试图避免缓存,因为每秒数百行这会杀死我的服务器。
还试图维护操作顺序,尽管这不是那么重要:有没有我不知道的解决方案?

编辑:我现在的程序:

kafkaStream;
lines = take the value, discard the topic;
lines.foreachRDD{
    splittedRDD = arg.map { split the string };
    assRDD = splittedRDD.map { associate to a table };
    flaggedRDD = assRDD.map { add a boolean parameter under a if condition + send mail};
    externalClass.saveStaticMethod( flaggedRDD.collect() and save in file);
    enrichRDD = flaggedRDD.map { enrich with external data };
    externalClass.saveStaticMethod( enrichRDD.collect() and save in file);
}

我把保存的部分放在邮件后面,这样如果有什么问题至少邮件已经发送了。

我找到的最后两种方法是:

  1. 在副作用之前的 DStream 转换中,复制 Dstream:一个将继续转换,另一个将具有 .foreachRDD{ outside action }。这没有什么大的缺点,因为它只是在工作节点上多了一个 RDD。
  2. 从转换中提取 {outside action} 并映射已发送的邮件:如果邮件已发送则进行过滤。这几乎是一个多余的操作,因为它将过滤掉所有的 RDD 元素。
  3. 继续之前的缓存(虽然我试图避免它,但没什么可做的)

如果尝试不缓存,解决方案 1 是可行的方法