如何使用 Scala 在 Spark 中从列表或数组创建行
How to create a Row from a List or Array in Spark using Scala
我正在尝试根据用户输入创建行 (org.apache.spark.sql.catalyst.expressions.Row
)。我无法随机创建行。
是否有从 List
或 Array
.
创建行的功能
例如,如果我有一个具有以下格式的 .csv
文件,
"91xxxxxxxxxx,21.31,15,0,0"
如果用户输入 [1, 2]
那么我只需要取第二列和第三列以及第一列 customer_id
我尝试用代码解析它:
val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `
其中 foo 定义为
def f(n: List[Int], s: String) : Row = {
val n = input.length
var out = new Array[Any](n+1)
var r = s.split(",")
out(0) = r(0)
for (i <- 1 to n)
out(i) = r(input(i-1)).toDouble
Row(out)
}
并且输入是一个列表说
val input = List(1,2)
执行此代码我得到 l3 为:
Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])
但我想要的是:
Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`
必须传递此信息才能在 Spark 中创建模式 SQL
像下面这样的东西应该可以工作:
import org.apache.spark.sql._
def f(n: List[Int], s: String) : Row =
Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
您缺少 StructField 和 StructType 的创建。请参阅官方指南 http://spark.apache.org/docs/latest/sql-programming-guide.html,第 部分以编程方式指定架构
我不是 Scala 专家,但在 Python 中它看起来像这样:
from pyspark.sql import *
sqlContext = SQLContext(sc)
input = [1,2]
def parse(line):
global input
l = line.split(',')
res = [l[0]]
for ind in input:
res.append(l[ind])
return res
csv = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))
fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)
csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()
简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并使用 applySchema
对其应用模式即可
您也可以试试:
Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))
我正在尝试根据用户输入创建行 (org.apache.spark.sql.catalyst.expressions.Row
)。我无法随机创建行。
是否有从 List
或 Array
.
例如,如果我有一个具有以下格式的 .csv
文件,
"91xxxxxxxxxx,21.31,15,0,0"
如果用户输入 [1, 2]
那么我只需要取第二列和第三列以及第一列 customer_id
我尝试用代码解析它:
val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `
其中 foo 定义为
def f(n: List[Int], s: String) : Row = {
val n = input.length
var out = new Array[Any](n+1)
var r = s.split(",")
out(0) = r(0)
for (i <- 1 to n)
out(i) = r(input(i-1)).toDouble
Row(out)
}
并且输入是一个列表说
val input = List(1,2)
执行此代码我得到 l3 为:
Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])
但我想要的是:
Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`
必须传递此信息才能在 Spark 中创建模式 SQL
像下面这样的东西应该可以工作:
import org.apache.spark.sql._
def f(n: List[Int], s: String) : Row =
Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
您缺少 StructField 和 StructType 的创建。请参阅官方指南 http://spark.apache.org/docs/latest/sql-programming-guide.html,第 部分以编程方式指定架构
我不是 Scala 专家,但在 Python 中它看起来像这样:
from pyspark.sql import *
sqlContext = SQLContext(sc)
input = [1,2]
def parse(line):
global input
l = line.split(',')
res = [l[0]]
for ind in input:
res.append(l[ind])
return res
csv = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))
fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)
csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()
简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并使用 applySchema
您也可以试试:
Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))