Spark:如何在映射转换中构造一系列副作用操作以避免重复?
Spark: How to structure a series of side effect actions inside mapping transformation to avoid repetition?
我有一个 Spark Streaming 应用程序需要执行以下步骤:
- 获取一个字符串,对其应用一些映射转换
- 再次映射:如果这个字符串(现在是一个数组)中有特定值,立即发送电子邮件(或在 spark 环境之外做一些事情)
- 收集()并保存在特定目录中
- 应用一些其他的 transformation/enrichment
- 收集()并保存在另一个目录中。
如您所见,这意味着延迟激活的计算会执行两次 OUTSIDE 操作。我试图避免缓存,因为每秒数百行这会杀死我的服务器。
还试图维护操作顺序,尽管这不是那么重要:有没有我不知道的解决方案?
编辑:我现在的程序:
kafkaStream;
lines = take the value, discard the topic;
lines.foreachRDD{
splittedRDD = arg.map { split the string };
assRDD = splittedRDD.map { associate to a table };
flaggedRDD = assRDD.map { add a boolean parameter under a if condition + send mail};
externalClass.saveStaticMethod( flaggedRDD.collect() and save in file);
enrichRDD = flaggedRDD.map { enrich with external data };
externalClass.saveStaticMethod( enrichRDD.collect() and save in file);
}
我把保存的部分放在邮件后面,这样如果有什么问题至少邮件已经发送了。
我找到的最后两种方法是:
- 在副作用之前的 DStream 转换中,复制 Dstream:一个将继续转换,另一个将具有
.foreachRDD{ outside action }
。这没有什么大的缺点,因为它只是在工作节点上多了一个 RDD。
- 从转换中提取
{outside action}
并映射已发送的邮件:如果邮件已发送则进行过滤。这几乎是一个多余的操作,因为它将过滤掉所有的 RDD 元素。
- 继续之前的缓存(虽然我试图避免它,但没什么可做的)
如果尝试不缓存,解决方案 1 是可行的方法
我有一个 Spark Streaming 应用程序需要执行以下步骤:
- 获取一个字符串,对其应用一些映射转换
- 再次映射:如果这个字符串(现在是一个数组)中有特定值,立即发送电子邮件(或在 spark 环境之外做一些事情)
- 收集()并保存在特定目录中
- 应用一些其他的 transformation/enrichment
- 收集()并保存在另一个目录中。
如您所见,这意味着延迟激活的计算会执行两次 OUTSIDE 操作。我试图避免缓存,因为每秒数百行这会杀死我的服务器。
还试图维护操作顺序,尽管这不是那么重要:有没有我不知道的解决方案?
编辑:我现在的程序:
kafkaStream;
lines = take the value, discard the topic;
lines.foreachRDD{
splittedRDD = arg.map { split the string };
assRDD = splittedRDD.map { associate to a table };
flaggedRDD = assRDD.map { add a boolean parameter under a if condition + send mail};
externalClass.saveStaticMethod( flaggedRDD.collect() and save in file);
enrichRDD = flaggedRDD.map { enrich with external data };
externalClass.saveStaticMethod( enrichRDD.collect() and save in file);
}
我把保存的部分放在邮件后面,这样如果有什么问题至少邮件已经发送了。
我找到的最后两种方法是:
- 在副作用之前的 DStream 转换中,复制 Dstream:一个将继续转换,另一个将具有
.foreachRDD{ outside action }
。这没有什么大的缺点,因为它只是在工作节点上多了一个 RDD。 - 从转换中提取
{outside action}
并映射已发送的邮件:如果邮件已发送则进行过滤。这几乎是一个多余的操作,因为它将过滤掉所有的 RDD 元素。 - 继续之前的缓存(虽然我试图避免它,但没什么可做的)
如果尝试不缓存,解决方案 1 是可行的方法