Spark 中自定义对象的 RDD 问题

Issue with RDD of Custom Objects in Spark

我正在像这样将类型传递到平面图中;

 val customData: RDD[(Text/String, Custom)] = origRDD.flatMap { case(x) => parse(x)}

这是returnsString和Custom的键值对(我在String上用了Text也没有用,就是'Text/String'的解释)。这个 class 扩展了 Serializable 并在 Kryo 中注册。

当我尝试 运行 程序时,它 只是 运行 并且永远不会结束。 永远不会结束我的意思是我已经离开它 运行宁 18 小时,它没有完成。如果我 将其更改为带有 Int(计数器)而不是自定义对象的 Text(hadoop io),它会很快完成 。当我说快时,我的意思是 30 分钟。它 运行ning 通过的数据是相同的数据,并且两者都使用了解析(以及平面图)方法,因此它 运行ning 通过完全相同的逻辑。它在平面图中使用的方法与解析方法相同。当我将它从 (Text, Int).

更改为 (Text/String, Custom) 时,行为会下降

我想知道我需要添加什么才能完成这项工作。它需要是可写的吗?

自定义对象示例 class 实现(显然不准确,但模仿得很好);

class Custom(dateAsLong: java.lang.Long, typeOfTransaction: util.HashSet[java.lang.Long], isCustomer: Boolean, amount: String, customerMatch: ObjectMatch) extends Serializable {
//has getters and setters here 

 val startDate = dateAsLong
 val transType = typeOfTransaction
 val customer = isCustomer
 val cost = amount
 val matchedData = customerMatch

 def getStartDate(): java.lang.Long = startDate
 def getTransType(): util.HashSet[java.lang.Long] = transType
 def getCustomer(): Boolean = customer
 def getCost(): String = amount
 def getMatchedData(): ObjectMatch = matchedData
}

扩展 java 可序列化的对象内部的解析方法示例;

object Paser extends Serializable { 
    def parse(transaction: Transaction, customerList: util.HashMap[String, String], storeList: util.HashMap[String, String]): List[(Text, Custom)] ={ //list because flatmap emits 0, 1 or 2 
//adds items to the list depending on conditions
    var list:List[(Text, Custom)] = List()
    val typeOfTransaction = getType(transaction)
    val dateAsLong = getDate(transaction)
    val amount = getAmount(transaction)
    val customerMatch = getCustomerMatch(transaction, customerList)
    val storeMatch = getStoreMatch(transaction, storeList)
     //more fields parsed

    if (customerMatch != Some(null)){
       isCustomer = true
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, amount, customerMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)

    }  
    if (storeMatch != Some(null)){
       isCustomer = false
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, typeOfTransaction, storeMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)
    }
  }
 list
}

Kryo序列化是这样的;

 conf.registerKryoClasses(Array(classOf[Custom]))

代码示例或示例链接可提供任何帮助。

Spark UI 用于(Text/String,自定义)运行

1/11 任务的底部进度是平面图,顶部是 saveAsNewHadoopAPIFile

平面图阶段 0 是 saveAsNewHadoopAPIFile -> filter x7 -> flatmap

运行 与 (Text, Int)

慢 运行(Text/String,自定义)运行 说 1.1 小时,但我已经让它 运行 18 小时。当它 运行 持续 18 小时时,它会慢慢通过,但是让它 运行 一天并不理想。有什么地方不对劲,非常不对劲。同样,两者都使用了 parse 方法,因此即使更快的 运行 没有输出自定义值,它也通过完全相同的逻辑 运行ning,而是输出键 Text 和 int.

不确定这是否有帮助,但不管是什么错误导致 Accumulo 中的扫描也出现不同。文本、Int 运行 的扫描量正常增加,在 30 分钟内保持相对相同 scans/s,然后下降。当我 运行 与习俗它增加然后立即下降。以类似 ^ 的方式,然后以较低的扫描速率拖延数小时。

Spark 版本:1.6.2,Scala 2.11

你不应该使用 var list:List[(Text, Custom)] = List()。每次执行代码 list = list :+ (new Text(transactionId), transaction) 都会创建一个新列表(它不仅仅是添加到现有列表中)。 List 在 Scala 中是不可变的。您应该将其替换为 val myListBuffer = ListBuffer[(Text, Custom)]()。我不确定这是否是唯一的问题 - 但如果您的列表很大,此更改应该会有所帮助。

此外,关于在 Scala 中编写代码的一些评论 - 在 Scala 中不需要 getter 和 setter class。无论如何,所有成员都是不可变的。在 Scala 中使用 var 之前,您需要认真思考。不变性将使您的代码具有弹性,提高可读性,并且更容易更改。