Spark foreach 分区连接改进
Spark foreachpartition connection improvements
我写了一个 spark 作业,它执行以下操作
从 HDFS 文本文件读取数据。
执行 distinct() 调用以过滤重复项。
做一个mapToPair阶段并生成pairRDD
调用 reducebykey
对分组的元组执行聚合逻辑。
现在在 #5
上调用 foreach
这里是
- 调用 cassandra db
- 创建 aws SNS 和 SQS 客户端连接
- 做一些 json 记录格式化。
- 将记录发布到 SNS/SQS
当我 运行 这个工作时,它创建了三个火花阶段
第一阶段 - 将近 45 秒。执行一个独特的
第二阶段 - mapToPair 和 reducebykey = 需要 1.5 分钟
第三阶段 = 需要 19 分钟
我做了什么
- 我关闭了 cassandra 调用,以便查看数据库命中原因 - 这花费的时间更少
- 我发现有问题的部分是为每个分区创建 SNS/SQS 连接
它占用了整个工作时间的 60% 以上
我正在 foreachPartition 中创建 SNS/SQS 连接以减少连接数。我们有更好的方法吗
我无法在驱动程序上创建连接对象,因为它们不可序列化
我没有使用executor 9,executor core 15,driver memory 2g,executor memory 5g
我用的是16核64g内存
集群大小 1 master 9 slave 所有配置相同
EMR 部署 spark 1.6
听起来您想为每个节点设置一个 SNS/SQS 连接,然后用它来处理每个节点上的所有数据。
我认为 foreachPartition 在这里是正确的想法,但您可能希望事先合并您的 RDD。这将在不混洗的情况下折叠同一节点上的分区,并允许您避免启动额外的 SNS/SQS 连接。
我写了一个 spark 作业,它执行以下操作
从 HDFS 文本文件读取数据。
执行 distinct() 调用以过滤重复项。
做一个mapToPair阶段并生成pairRDD
调用 reducebykey
对分组的元组执行聚合逻辑。
现在在 #5
上调用 foreach这里是
- 调用 cassandra db
- 创建 aws SNS 和 SQS 客户端连接
- 做一些 json 记录格式化。
- 将记录发布到 SNS/SQS
当我 运行 这个工作时,它创建了三个火花阶段
第一阶段 - 将近 45 秒。执行一个独特的 第二阶段 - mapToPair 和 reducebykey = 需要 1.5 分钟
第三阶段 = 需要 19 分钟
我做了什么
- 我关闭了 cassandra 调用,以便查看数据库命中原因 - 这花费的时间更少
- 我发现有问题的部分是为每个分区创建 SNS/SQS 连接
它占用了整个工作时间的 60% 以上
我正在 foreachPartition 中创建 SNS/SQS 连接以减少连接数。我们有更好的方法吗
我无法在驱动程序上创建连接对象,因为它们不可序列化
我没有使用executor 9,executor core 15,driver memory 2g,executor memory 5g
我用的是16核64g内存 集群大小 1 master 9 slave 所有配置相同 EMR 部署 spark 1.6
听起来您想为每个节点设置一个 SNS/SQS 连接,然后用它来处理每个节点上的所有数据。
我认为 foreachPartition 在这里是正确的想法,但您可能希望事先合并您的 RDD。这将在不混洗的情况下折叠同一节点上的分区,并允许您避免启动额外的 SNS/SQS 连接。