Spark Scala 2.10 元组限制

Spark Scala 2.10 tuple limit

我有包含 66 列的 DataFrame 要处理(几乎每个列的值都需要以某种方式更改)所以我 运行 遵循语句

    val result = data.map(row=> (
        modify(row.getString(row.fieldIndex("XX"))),
        (...)
        )
    )

直到第 66 列。 由于此版本中的 scala 限制为最多 22 对元组,因此我不能那样执行。 问题是,有什么解决方法吗? 在所有行操作之后,我将其转换为具有特定列名的 df

   result.toDf("c1",...,"c66")
   result.storeAsTempTable("someFancyResult")

"modify"函数只是一个例子来说明我的观点

绕过它的方式非常繁琐,但它确实有效,请尝试使用此示例代码开始,您可以看到有超过 22 列被访问:

object SimpleApp {
  class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable {
    def canEqual(that: Any) = that.isInstanceOf[Record]

    def productArity = 24

    def productElement(n: Int) = n match {
      case 0 => x1
      case 1 => x2
      case 2 => x3
      ...
      case 23 => x24
    }
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Product Test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc);

    val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x")

    import sqlContext._
    sc.parallelize(record :: Nil).registerAsTable("records")

    sql("SELECT x1 FROM records").collect()
  }
}

如果您所做的只是修改现有 DataFrame 的值,最好使用 UDF 而不是映射到 RDD:

import org.apache.spark.sql.functions.udf

val modifyUdf = udf(modify)
data.withColumn("c1", modifyUdf($"c1"))

如果由于某些原因上述内容不符合您的需要,您可以做的最简单的事情就是从 RDD[Row] 重新创建DataFrame。例如像这样:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType}


val result: RDD[Row] = data.map(row => {
  val buffer = ArrayBuffer.empty[Any]

  // Add value to buffer
  buffer.append(modify(row.getAs[String]("c1")))

  // ... repeat for other values

  // Build row
  Row.fromSeq(buffer)
})

// Create schema
val schema = StructType(Seq(
  StructField("c1", StringType, false),
  // ...  
  StructField("c66", StringType, false)
))

sqlContext.createDataFrame(result, schema)