Spark foreach 分区连接改进

Spark foreachpartition connection improvements

我写了一个 spark 作业,它执行以下操作

  1. 从 HDFS 文本文件读取数据。

  2. 执行 distinct() 调用以过滤重复项。

  3. 做一个mapToPair阶段并生成pairRDD

  4. 调用 reducebykey

  5. 对分组的元组执行聚合逻辑。

  6. 现在在 #5

    上调用 foreach

    这里是

    1. 调用 cassandra db
    2. 创建 aws SNS 和 SQS 客户端连接
    3. 做一些 json 记录格式化。
    4. 将记录发布到 SNS/SQS

当我 运行 这个工作时,它创建了三个火花阶段

第一阶段 - 将近 45 秒。执行一个独特的 第二阶段 - mapToPair 和 reducebykey = 需要 1.5 分钟

第三阶段 = 需要 19 分钟

我做了什么

  1. 我关闭了 cassandra 调用,以便查看数据库命中原因 - 这花费的时间更少
  2. 我发现有问题的部分是为每个分区创建 SNS/SQS 连接

它占用了整个工作时间的 60% 以上

我正在 foreachPartition 中创建 SNS/SQS 连接以减少连接数。我们有更好的方法吗

我无法在驱动程序上创建连接对象,因为它们不可序列化

我没有使用executor 9,executor core 15,driver memory 2g,executor memory 5g

我用的是16核64g内存 集群大小 1 master 9 slave 所有配置相同 EMR 部署 spark 1.6

听起来您想为每个节点设置一个 SNS/SQS 连接,然后用它来处理每个节点上的所有数据。

我认为 foreachPartition 在这里是正确的想法,但您可能希望事先合并您的 RDD。这将在不混洗的情况下折叠同一节点上的分区,并允许您避免启动额外的 SNS/SQS 连接。

看这里: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@coalesce(numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]