更新对象的 RDD

Updating RDD of Objects

我刚刚开始学习 Scala 并面临一些有关对象 RDD 操作的问题。

我遇到了与下面所述相同的问题link

对于上述link的问题,还有其他方法可以解决吗?也可以使用数据集或数据框来实现我们正在尝试做的事情吗?

不变性是函数式编程的关键概念之一。您无法更改 RDD 或其中的数据,但您可以根据旧 RDD 的数据创建新的 RDD

我修改了您问题中 link 中的示例,以说明这种转换通常是什么样子的。

//just case class with foo and bar fields that can be empty.
case class Test (foo: Option[Double], bar: Option[Double], someOtherVal: String)

// as you can see this is not actually "update"
// it creates new Test with "updated" foo and bar fields 
// NOTE: this logic usually lives outside data object 
def updateFooBar(t: Test) = Test(Some(Math.random()), Some(Math.random()),t.someOtherVal)


val testList = Array.fill(5)(Test(None,None,"someString"))
val testRDD = sc.parallelize(testList)

//creates new RDD based on old one by applying updateFooBar to each element. 
val newRdd = testRDD.map{ x => updateFooBar(x) }
//or just  val newRdd = testRDD.map(updateFooBar)

newRdd.collect().foreach { x=> println(x.foo+"~"+x.bar+"~"+x.someOtherVal) }

您可以使用与 RDD 完全相同的方式转换 Dataset:

val newDs = testRDD.toDS().map( x => updateFooBar(x))

或使用Dataframe:

import org.apache.spark.sql.functions.typedLit

val newDf = testRDD.toDF()
  .withColumn("foo",typedLit(Some(Math.random())))
  .withColumn("bar",typedLit(Some(Math.random())))