Spark Scala 2.10 元组限制
Spark Scala 2.10 tuple limit
我有包含 66 列的 DataFrame 要处理(几乎每个列的值都需要以某种方式更改)所以我 运行 遵循语句
val result = data.map(row=> (
modify(row.getString(row.fieldIndex("XX"))),
(...)
)
)
直到第 66 列。
由于此版本中的 scala 限制为最多 22 对元组,因此我不能那样执行。
问题是,有什么解决方法吗?
在所有行操作之后,我将其转换为具有特定列名的 df
result.toDf("c1",...,"c66")
result.storeAsTempTable("someFancyResult")
"modify"函数只是一个例子来说明我的观点
绕过它的方式非常繁琐,但它确实有效,请尝试使用此示例代码开始,您可以看到有超过 22 列被访问:
object SimpleApp {
class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable {
def canEqual(that: Any) = that.isInstanceOf[Record]
def productArity = 24
def productElement(n: Int) = n match {
case 0 => x1
case 1 => x2
case 2 => x3
...
case 23 => x24
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Product Test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x")
import sqlContext._
sc.parallelize(record :: Nil).registerAsTable("records")
sql("SELECT x1 FROM records").collect()
}
}
如果您所做的只是修改现有 DataFrame
的值,最好使用 UDF 而不是映射到 RDD:
import org.apache.spark.sql.functions.udf
val modifyUdf = udf(modify)
data.withColumn("c1", modifyUdf($"c1"))
如果由于某些原因上述内容不符合您的需要,您可以做的最简单的事情就是从 RDD[Row]
重新创建DataFrame
。例如像这样:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType}
val result: RDD[Row] = data.map(row => {
val buffer = ArrayBuffer.empty[Any]
// Add value to buffer
buffer.append(modify(row.getAs[String]("c1")))
// ... repeat for other values
// Build row
Row.fromSeq(buffer)
})
// Create schema
val schema = StructType(Seq(
StructField("c1", StringType, false),
// ...
StructField("c66", StringType, false)
))
sqlContext.createDataFrame(result, schema)
我有包含 66 列的 DataFrame 要处理(几乎每个列的值都需要以某种方式更改)所以我 运行 遵循语句
val result = data.map(row=> (
modify(row.getString(row.fieldIndex("XX"))),
(...)
)
)
直到第 66 列。 由于此版本中的 scala 限制为最多 22 对元组,因此我不能那样执行。 问题是,有什么解决方法吗? 在所有行操作之后,我将其转换为具有特定列名的 df
result.toDf("c1",...,"c66")
result.storeAsTempTable("someFancyResult")
"modify"函数只是一个例子来说明我的观点
绕过它的方式非常繁琐,但它确实有效,请尝试使用此示例代码开始,您可以看到有超过 22 列被访问:
object SimpleApp {
class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable {
def canEqual(that: Any) = that.isInstanceOf[Record]
def productArity = 24
def productElement(n: Int) = n match {
case 0 => x1
case 1 => x2
case 2 => x3
...
case 23 => x24
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Product Test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x")
import sqlContext._
sc.parallelize(record :: Nil).registerAsTable("records")
sql("SELECT x1 FROM records").collect()
}
}
如果您所做的只是修改现有 DataFrame
的值,最好使用 UDF 而不是映射到 RDD:
import org.apache.spark.sql.functions.udf
val modifyUdf = udf(modify)
data.withColumn("c1", modifyUdf($"c1"))
如果由于某些原因上述内容不符合您的需要,您可以做的最简单的事情就是从 RDD[Row]
重新创建DataFrame
。例如像这样:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType}
val result: RDD[Row] = data.map(row => {
val buffer = ArrayBuffer.empty[Any]
// Add value to buffer
buffer.append(modify(row.getAs[String]("c1")))
// ... repeat for other values
// Build row
Row.fromSeq(buffer)
})
// Create schema
val schema = StructType(Seq(
StructField("c1", StringType, false),
// ...
StructField("c66", StringType, false)
))
sqlContext.createDataFrame(result, schema)