如何避免在 Scala 的 Spark RDD 中使用 collect?
How to avoid using of collect in Spark RDD in Scala?
我有一个 List 并且必须从中创建 Map 以供进一步使用,我正在使用 RDD,但是使用 collect() 时,作业在集群中失败。感谢任何帮助。
请帮忙。下面是从 List 到 rdd.collect 的示例代码。
我必须进一步使用此地图数据,但如何在不收集的情况下使用?
此代码从 RDD(列表)数据创建一个映射。列表格式->(asdfg/1234/wert,asdf)
//List Data to create Map
val listData = methodToGetListData(ListData).toList
//Creating RDD from above List
val rdd = sparkContext.makeRDD(listData)
implicit val formats = Serialization.formats(NoTypeHints)
val res = rdd
.map(map => (getRPath(map._1), getAttribute(map._1), map._2))
.groupBy(_._1)
.map(tuple => {
Map(
"P_Id" -> "1234",
"R_Time" -> "27-04-2020",
"S_Time" -> "27-04-2020",
"r_path" -> tuple._1,
"S_Tag" -> "12345,
tuple._1 -> (tuple._2.map(a => (a._2, a._3)).toMap)
)
})
res.collect()
}
Q: how to use without collect?
Answer : collect
will hit.. it will move the data to driver node. if data is
huge. Never do that.
我不完全知道准备 map
的用例是什么,但它可以实现 使用内置的 spark API 即collectionAccumulator
...详细来说,
collectionAccumulator[scala.collection.mutable.Map[String, String]]
假设这是您的示例数据框,您想要制作地图。
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
由此你想制作一个地图(我在你的示例中使用 nestedmap 键名作为前缀的嵌套地图)然后...
下面是完整的示例,请查看并进行相应的修改。
package examples
import org.apache.log4j.Level
object GrabMapbetweenClosure extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
import spark.implicits._
var mutableMapAcc = spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String, String]]("mutableMap")
val df = Seq(
("-0909", "1234", "Cables-1", "23-12-2020", "LC", "Installed", "ABCD1234"
, "0", "Cables", "ASDF123", "12345", "Start~>HInfo->Cables->Cables-1")
, ("-09091", "1234111", "Cables-11", "23-12-2022", "LC1", "Installed1", "ABCD12341"
, "0", "Cables1", "ASDF1231", "123451", "Start~>HInfo->Cables->Cables-11")
).toDF("Item_Id", "Parent_Id", "object_class_instance", "Received_Time", "CablesName", "CablesStatus", "CablesHInfoID",
"CablesIndex", "object_class", "ServiceTag", "Scan_Time", "relation_tree"
)
df.show(false)
df.foreachPartition { partition => // for performance sake I used foreachPartition
partition.foreach {
record => {
mutableMapAcc.add(scala.collection.mutable.Map(
"Item_Id" -> record.getAs[String]("Item_Id")
, "CablesStatus" -> record.getAs[String]("CablesStatus")
, "CablesHInfoID" -> record.getAs[String]("CablesHInfoID")
, "Parent_Id" -> record.getAs[String]("Parent_Id")
, "CablesIndex" -> record.getAs[String]("CablesIndex")
, "object_class_instance" -> record.getAs[String]("object_class_instance")
, "Received_Time" -> record.getAs[String]("Received_Time")
, "object_class" -> record.getAs[String]("object_class")
, "CablesName" -> record.getAs[String]("CablesName")
, "ServiceTag" -> record.getAs[String]("ServiceTag")
, "Scan_Time" -> record.getAs[String]("Scan_Time")
, "relation_tree" -> record.getAs[String]("relation_tree")
)
)
}
}
}
println("FinalMap : " + mutableMapAcc.value.toString)
}
结果:
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
FinalMap : [Map(Scan_Time -> 123451, ServiceTag -> ASDF1231, Received_Time -> 23-12-2022, object_class_instance -> Cables-11, CablesHInfoID -> ABCD12341, Parent_Id -> 1234111, Item_Id -> -09091, CablesIndex -> 0, object_class -> Cables1, relation_tree -> Start~>HInfo->Cables->Cables-11, CablesName -> LC1, CablesStatus -> Installed1), Map(Scan_Time -> 12345, ServiceTag -> ASDF123, Received_Time -> 23-12-2020, object_class_instance -> Cables-1, CablesHInfoID -> ABCD1234, Parent_Id -> 1234, Item_Id -> -0909, CablesIndex -> 0, object_class -> Cables, relation_tree -> Start~>HInfo->Cables->Cables-1, CablesName -> LC, CablesStatus -> Installed)]
我有一个 List 并且必须从中创建 Map 以供进一步使用,我正在使用 RDD,但是使用 collect() 时,作业在集群中失败。感谢任何帮助。
请帮忙。下面是从 List 到 rdd.collect 的示例代码。 我必须进一步使用此地图数据,但如何在不收集的情况下使用?
此代码从 RDD(列表)数据创建一个映射。列表格式->(asdfg/1234/wert,asdf)
//List Data to create Map
val listData = methodToGetListData(ListData).toList
//Creating RDD from above List
val rdd = sparkContext.makeRDD(listData)
implicit val formats = Serialization.formats(NoTypeHints)
val res = rdd
.map(map => (getRPath(map._1), getAttribute(map._1), map._2))
.groupBy(_._1)
.map(tuple => {
Map(
"P_Id" -> "1234",
"R_Time" -> "27-04-2020",
"S_Time" -> "27-04-2020",
"r_path" -> tuple._1,
"S_Tag" -> "12345,
tuple._1 -> (tuple._2.map(a => (a._2, a._3)).toMap)
)
})
res.collect()
}
Q: how to use without collect?
Answer :
collect
will hit.. it will move the data to driver node. if data is huge. Never do that.
我不完全知道准备 map
的用例是什么,但它可以实现 使用内置的 spark API 即collectionAccumulator
...详细来说,
collectionAccumulator[scala.collection.mutable.Map[String, String]]
假设这是您的示例数据框,您想要制作地图。
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
由此你想制作一个地图(我在你的示例中使用 nestedmap 键名作为前缀的嵌套地图)然后...
下面是完整的示例,请查看并进行相应的修改。
package examples
import org.apache.log4j.Level
object GrabMapbetweenClosure extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
import spark.implicits._
var mutableMapAcc = spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String, String]]("mutableMap")
val df = Seq(
("-0909", "1234", "Cables-1", "23-12-2020", "LC", "Installed", "ABCD1234"
, "0", "Cables", "ASDF123", "12345", "Start~>HInfo->Cables->Cables-1")
, ("-09091", "1234111", "Cables-11", "23-12-2022", "LC1", "Installed1", "ABCD12341"
, "0", "Cables1", "ASDF1231", "123451", "Start~>HInfo->Cables->Cables-11")
).toDF("Item_Id", "Parent_Id", "object_class_instance", "Received_Time", "CablesName", "CablesStatus", "CablesHInfoID",
"CablesIndex", "object_class", "ServiceTag", "Scan_Time", "relation_tree"
)
df.show(false)
df.foreachPartition { partition => // for performance sake I used foreachPartition
partition.foreach {
record => {
mutableMapAcc.add(scala.collection.mutable.Map(
"Item_Id" -> record.getAs[String]("Item_Id")
, "CablesStatus" -> record.getAs[String]("CablesStatus")
, "CablesHInfoID" -> record.getAs[String]("CablesHInfoID")
, "Parent_Id" -> record.getAs[String]("Parent_Id")
, "CablesIndex" -> record.getAs[String]("CablesIndex")
, "object_class_instance" -> record.getAs[String]("object_class_instance")
, "Received_Time" -> record.getAs[String]("Received_Time")
, "object_class" -> record.getAs[String]("object_class")
, "CablesName" -> record.getAs[String]("CablesName")
, "ServiceTag" -> record.getAs[String]("ServiceTag")
, "Scan_Time" -> record.getAs[String]("Scan_Time")
, "relation_tree" -> record.getAs[String]("relation_tree")
)
)
}
}
}
println("FinalMap : " + mutableMapAcc.value.toString)
}
结果:
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
FinalMap : [Map(Scan_Time -> 123451, ServiceTag -> ASDF1231, Received_Time -> 23-12-2022, object_class_instance -> Cables-11, CablesHInfoID -> ABCD12341, Parent_Id -> 1234111, Item_Id -> -09091, CablesIndex -> 0, object_class -> Cables1, relation_tree -> Start~>HInfo->Cables->Cables-11, CablesName -> LC1, CablesStatus -> Installed1), Map(Scan_Time -> 12345, ServiceTag -> ASDF123, Received_Time -> 23-12-2020, object_class_instance -> Cables-1, CablesHInfoID -> ABCD1234, Parent_Id -> 1234, Item_Id -> -0909, CablesIndex -> 0, object_class -> Cables, relation_tree -> Start~>HInfo->Cables->Cables-1, CablesName -> LC, CablesStatus -> Installed)]