Spark:show 和 collect-println 给出不同的输出
Spark: show and collect-println giving different outputs
我正在使用 Spark 2.2
我觉得我这里发生了一些奇怪的事情。基本前提是
- 我有一组 KIE/Drools 规则 运行 通过配置文件对象的数据集
- 我然后尝试 show/collect-print 结果输出
- 然后我将输出转换为一个元组以便稍后对其进行平面映射
下面的代码
implicit val mapEncoder = Encoders.kryo[java.util.HashMap[String, Any]]
implicit val recommendationEncoder = Encoders.kryo[Recommendation]
val mapper = new ObjectMapper()
val kieOuts = uberDs.map(profile => {
val map = mapper.convertValue(profile, classOf[java.util.HashMap[String, Any]])
val profile = Profile(map)
// setup the kie session
val ks = KieServices.Factory.get
val kContainer = ks.getKieClasspathContainer
val kSession = kContainer.newKieSession() //TODO: stateful session, how to do stateless?
// insert profile object into kie session
val kCmds = ks.getCommands
val cmds = new java.util.ArrayList[Command[_]]()
cmds.add(kCmds.newInsert(profile))
cmds.add(kCmds.newFireAllRules("outFired"))
// fire kie rules
val results = kSession.execute(kCmds.newBatchExecution(cmds))
val fired = results.getValue("outFired").toString.toInt
// collect the inserted recommendation objects and create uid string
import scala.collection.JavaConversions._
var gresults = kSession.getObjects
gresults = gresults.drop(1) // drop the inserted profile object which also gets collected
val recommendations = scala.collection.mutable.ListBuffer[Recommendation]()
gresults.toList.foreach(reco => {
val recommendation = reco.asInstanceOf[Recommendation]
recommendations += recommendation
})
kSession.dispose
val uIds = StringBuilder.newBuilder
if(recommendations.size > 0) {
recommendations.foreach(recommendation => {
uIds.append(recommendation.getOfferId + "_" + recommendation.getScore)
uIds.append(";")
})
uIds.deleteCharAt(uIds.size - 1)
}
new ORecommendation(profile.getAttributes().get("cId").toString.toLong, fired, uIds.toString)
})
println("======================Output#1======================")
kieOuts.show(1000, false)
println("======================Output#2======================")
kieOuts.collect.foreach(println)
//separating cid and and each uid into individual rows
val kieOutsDs = kieOuts.as[(Long, Int, String)]
println("======================Output#3======================")
kieOutsDs.show(1000, false)
(我有 sanitized/shortened 下面的 id,它们更大但格式相似)
我看到的输出
输出#1 出现一组 uId(作为字符串)
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cId |rulesFired | eligibleUIds |
|842 | 17|123-25_2.0;12345678-48_9.0;28a-ad_5.0;123-56_10.0;123-27_2.0;123-32_3.0;c6d-e5_5.0;123-26_2.0;123-51_10.0;8e8-c1_5.0;123-24_2.0;df8-ad_5.0;123-36_5.0;123-16_2.0;123-34_3.0|
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
输出 #2 显示了大部分相似的 uId(通常相差 1 个元素)
ORecommendation(842,17,123-36_5.0;123-24_2.0;8e8-c1_5.0;df8-ad_5.0;28a-ad_5.0;660-73_5.0;123-34_3.0;123-48_9.0;123-16_2.0;123-51_10.0;123-26_2.0;c6d-e5_5.0;123-25_2.0;123-56_10.0;123-32_3.0)
输出#3 与#Output1 相同
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|842 | 17 |123-32_3.0;8e8-c1_5.0;123-51_10.0;123-48_9.0;28a-ad_5.0;c6d-e5_5.0;123-27_2.0;123-16_2.0;123-24_2.0;123-56_10.0;123-34_3.0;123-36_5.0;123-6_2.0;123-25_2.0;660-73_5.0|
每次我 运行 Output#1 和 Output#2 之间的差异是 1 个元素但绝不是相同的元素(在上面的示例中,Output#1 有 123-27_2.0 但输出#2 有 660-73_5.0)
不应该是一样的吗?我对 Scala/Spark 还是个新手,感觉我缺少一些非常基础的东西
我想我明白了,将 cache 添加到 kieOuts 至少让我在 show 和 collect 之间得到相同的输出。
我将研究为什么 KIE 为相同输入的每个 运行 提供不同的输出,但这是一个不同的问题
我正在使用 Spark 2.2
我觉得我这里发生了一些奇怪的事情。基本前提是
- 我有一组 KIE/Drools 规则 运行 通过配置文件对象的数据集
- 我然后尝试 show/collect-print 结果输出
- 然后我将输出转换为一个元组以便稍后对其进行平面映射
下面的代码
implicit val mapEncoder = Encoders.kryo[java.util.HashMap[String, Any]]
implicit val recommendationEncoder = Encoders.kryo[Recommendation]
val mapper = new ObjectMapper()
val kieOuts = uberDs.map(profile => {
val map = mapper.convertValue(profile, classOf[java.util.HashMap[String, Any]])
val profile = Profile(map)
// setup the kie session
val ks = KieServices.Factory.get
val kContainer = ks.getKieClasspathContainer
val kSession = kContainer.newKieSession() //TODO: stateful session, how to do stateless?
// insert profile object into kie session
val kCmds = ks.getCommands
val cmds = new java.util.ArrayList[Command[_]]()
cmds.add(kCmds.newInsert(profile))
cmds.add(kCmds.newFireAllRules("outFired"))
// fire kie rules
val results = kSession.execute(kCmds.newBatchExecution(cmds))
val fired = results.getValue("outFired").toString.toInt
// collect the inserted recommendation objects and create uid string
import scala.collection.JavaConversions._
var gresults = kSession.getObjects
gresults = gresults.drop(1) // drop the inserted profile object which also gets collected
val recommendations = scala.collection.mutable.ListBuffer[Recommendation]()
gresults.toList.foreach(reco => {
val recommendation = reco.asInstanceOf[Recommendation]
recommendations += recommendation
})
kSession.dispose
val uIds = StringBuilder.newBuilder
if(recommendations.size > 0) {
recommendations.foreach(recommendation => {
uIds.append(recommendation.getOfferId + "_" + recommendation.getScore)
uIds.append(";")
})
uIds.deleteCharAt(uIds.size - 1)
}
new ORecommendation(profile.getAttributes().get("cId").toString.toLong, fired, uIds.toString)
})
println("======================Output#1======================")
kieOuts.show(1000, false)
println("======================Output#2======================")
kieOuts.collect.foreach(println)
//separating cid and and each uid into individual rows
val kieOutsDs = kieOuts.as[(Long, Int, String)]
println("======================Output#3======================")
kieOutsDs.show(1000, false)
(我有 sanitized/shortened 下面的 id,它们更大但格式相似)
我看到的输出
输出#1 出现一组 uId(作为字符串)
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cId |rulesFired | eligibleUIds |
|842 | 17|123-25_2.0;12345678-48_9.0;28a-ad_5.0;123-56_10.0;123-27_2.0;123-32_3.0;c6d-e5_5.0;123-26_2.0;123-51_10.0;8e8-c1_5.0;123-24_2.0;df8-ad_5.0;123-36_5.0;123-16_2.0;123-34_3.0|
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
输出 #2 显示了大部分相似的 uId(通常相差 1 个元素)
ORecommendation(842,17,123-36_5.0;123-24_2.0;8e8-c1_5.0;df8-ad_5.0;28a-ad_5.0;660-73_5.0;123-34_3.0;123-48_9.0;123-16_2.0;123-51_10.0;123-26_2.0;c6d-e5_5.0;123-25_2.0;123-56_10.0;123-32_3.0)
输出#3 与#Output1 相同
+----+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|842 | 17 |123-32_3.0;8e8-c1_5.0;123-51_10.0;123-48_9.0;28a-ad_5.0;c6d-e5_5.0;123-27_2.0;123-16_2.0;123-24_2.0;123-56_10.0;123-34_3.0;123-36_5.0;123-6_2.0;123-25_2.0;660-73_5.0|
每次我 运行 Output#1 和 Output#2 之间的差异是 1 个元素但绝不是相同的元素(在上面的示例中,Output#1 有 123-27_2.0 但输出#2 有 660-73_5.0)
不应该是一样的吗?我对 Scala/Spark 还是个新手,感觉我缺少一些非常基础的东西
我想我明白了,将 cache 添加到 kieOuts 至少让我在 show 和 collect 之间得到相同的输出。 我将研究为什么 KIE 为相同输入的每个 运行 提供不同的输出,但这是一个不同的问题