试图在 Scala 中并行化嵌套循环
Attemping to parallelize a nested loop in Scala
我正在使用嵌套循环和外部 jar 比较 scala/spark 中的 2 个数据帧。
for (nrow <- dfm.rdd.collect) {
var mid = nrow.mkString(",").split(",")(0)
var mfname = nrow.mkString(",").split(",")(1)
var mlname = nrow.mkString(",").split(",")(2)
var mlssn = nrow.mkString(",").split(",")(3)
for (drow <- dfn.rdd.collect) {
var nid = drow.mkString(",").split(",")(0)
var nfname = drow.mkString(",").split(",")(1)
var nlname = drow.mkString(",").split(",")(2)
var nlssn = drow.mkString(",").split(",")(3)
val fNameArray = Array(mfname,nfname)
val lNameArray = Array (mlname,nlname)
val ssnArray = Array (mlssn,nlssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
if(overallscore >= .95) {
println("MeditechID:".concat(mid)
.concat(" MeditechFname:").concat(mfname)
.concat(" MeditechLname:").concat(mlname)
.concat(" MeditechSSN:").concat(mlssn)
.concat(" NextGenID:").concat(nid)
.concat(" NextGenFname:").concat(nfname)
.concat(" NextGenLname:").concat(nlname)
.concat(" NextGenSSN:").concat(nlssn)
.concat(" FnameScore:").concat(fnamescore.toString)
.concat(" LNameScore:").concat(lnamescore.toString)
.concat(" SSNScore:").concat(ssnscore.toString)
.concat(" OverallScore:").concat(overallscore.toString))
}
}
}
我希望做的是在外循环中添加一些并行性,这样我就可以创建一个 5 线程池,并从外循环的集合中提取 5 条记录,并将它们与内循环的集合进行比较,而不是连续执行此操作。所以结果是我可以指定线程数,在任何给定时间针对内循环中的集合从外循环的集合处理中获得 5 条记录。我该怎么做?
这不是迭代 spark 数据帧的正确方法。主要关注的是 dfm.rdd.collect
。如果数据帧任意大,您最终会出现异常。这是因为 collect
函数本质上是将所有数据带入主节点。
替代方法是使用 rdd 的 foreach 或 map 结构。
dfm.rdd.foreach(x => {
// your logic
}
现在您正在尝试在此处迭代第二个数据框。恐怕这是不可能的。优雅的方法是加入 dfm
和 dfn
并迭代生成的数据集以计算您的函数。
让我们先分析一下你在做什么。您将 dfm
的数据收集到 driver。然后,对于从 dfn
收集数据的每个元素,对其进行转换并计算每对元素的分数。
这在很多方面都有问题。首先,即使不考虑并行计算,dfn
的元素的变换次数与 dfm
的元素一样多。此外,您为 dfm
的每一行收集 dfn
的数据。这是很多网络通信(在 driver 和执行者之间)。
如果你想使用 spark 来并行化你的计算,你需要使用 API(RDD,SQL 或数据集)。您似乎想使用 RDD 来执行笛卡尔积(这是 O(N*M) 所以要小心,这可能需要一段时间)。
让我们先在 笛卡尔积之前转换数据,以避免每个元素多次执行它们。另外,为了清楚起见,我们定义一个 case class 来包含您的数据,以及一个将您的数据帧转换为该 case class.
的 RDD 的函数
case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
df.rdd.map(row => {
// using pattern matching to convert the array to the case class X
row.mkString(",").split(",") match {
case Array(a, b, c, d) => X(a, b, c, d)
}
})
}
然后,我使用 filter
只保留分数超过 .95
的元组,但您可以使用 map
、foreach
... 取决于什么你打算做什么。
val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
val fNameArray = Array(xm.fname,xn.fname)
val lNameArray = Array(xm.lname,xn.lname)
val ssnArray = Array(xm.lssn,xn.lssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
// and then, let's say we filter by score
overallscore > .95
}}
我正在使用嵌套循环和外部 jar 比较 scala/spark 中的 2 个数据帧。
for (nrow <- dfm.rdd.collect) {
var mid = nrow.mkString(",").split(",")(0)
var mfname = nrow.mkString(",").split(",")(1)
var mlname = nrow.mkString(",").split(",")(2)
var mlssn = nrow.mkString(",").split(",")(3)
for (drow <- dfn.rdd.collect) {
var nid = drow.mkString(",").split(",")(0)
var nfname = drow.mkString(",").split(",")(1)
var nlname = drow.mkString(",").split(",")(2)
var nlssn = drow.mkString(",").split(",")(3)
val fNameArray = Array(mfname,nfname)
val lNameArray = Array (mlname,nlname)
val ssnArray = Array (mlssn,nlssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
if(overallscore >= .95) {
println("MeditechID:".concat(mid)
.concat(" MeditechFname:").concat(mfname)
.concat(" MeditechLname:").concat(mlname)
.concat(" MeditechSSN:").concat(mlssn)
.concat(" NextGenID:").concat(nid)
.concat(" NextGenFname:").concat(nfname)
.concat(" NextGenLname:").concat(nlname)
.concat(" NextGenSSN:").concat(nlssn)
.concat(" FnameScore:").concat(fnamescore.toString)
.concat(" LNameScore:").concat(lnamescore.toString)
.concat(" SSNScore:").concat(ssnscore.toString)
.concat(" OverallScore:").concat(overallscore.toString))
}
}
}
我希望做的是在外循环中添加一些并行性,这样我就可以创建一个 5 线程池,并从外循环的集合中提取 5 条记录,并将它们与内循环的集合进行比较,而不是连续执行此操作。所以结果是我可以指定线程数,在任何给定时间针对内循环中的集合从外循环的集合处理中获得 5 条记录。我该怎么做?
这不是迭代 spark 数据帧的正确方法。主要关注的是 dfm.rdd.collect
。如果数据帧任意大,您最终会出现异常。这是因为 collect
函数本质上是将所有数据带入主节点。
替代方法是使用 rdd 的 foreach 或 map 结构。
dfm.rdd.foreach(x => {
// your logic
}
现在您正在尝试在此处迭代第二个数据框。恐怕这是不可能的。优雅的方法是加入 dfm
和 dfn
并迭代生成的数据集以计算您的函数。
让我们先分析一下你在做什么。您将 dfm
的数据收集到 driver。然后,对于从 dfn
收集数据的每个元素,对其进行转换并计算每对元素的分数。
这在很多方面都有问题。首先,即使不考虑并行计算,dfn
的元素的变换次数与 dfm
的元素一样多。此外,您为 dfm
的每一行收集 dfn
的数据。这是很多网络通信(在 driver 和执行者之间)。
如果你想使用 spark 来并行化你的计算,你需要使用 API(RDD,SQL 或数据集)。您似乎想使用 RDD 来执行笛卡尔积(这是 O(N*M) 所以要小心,这可能需要一段时间)。
让我们先在 笛卡尔积之前转换数据,以避免每个元素多次执行它们。另外,为了清楚起见,我们定义一个 case class 来包含您的数据,以及一个将您的数据帧转换为该 case class.
的 RDD 的函数case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
df.rdd.map(row => {
// using pattern matching to convert the array to the case class X
row.mkString(",").split(",") match {
case Array(a, b, c, d) => X(a, b, c, d)
}
})
}
然后,我使用 filter
只保留分数超过 .95
的元组,但您可以使用 map
、foreach
... 取决于什么你打算做什么。
val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
val fNameArray = Array(xm.fname,xn.fname)
val lNameArray = Array(xm.lname,xn.lname)
val ssnArray = Array(xm.lssn,xn.lssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
// and then, let's say we filter by score
overallscore > .95
}}