有多少种方法可以将新列添加到 Spark API 中的数据框 RDD?
how many ways are there to add a new column to a data frame RDD in Spark API?
我能想到一个只使用 withColumn():
val df = sc.dataFrame.withColumn('newcolname',{ lambda row: row + 1 } )
但我如何将其推广到文本数据?例如我的 DataFrame 有
strning 值说 "This is an example of a string" 我想提取
第一个和最后一个单词,如 val arraystring 中所示:Array[String] = Array(first,last)
# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
.map(lambda i: Row(single=i, double=i * 2)))
df1.save("data/test_table/key=1", "parquet")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i * 3)))
df2.save("data/test_table/key=2", "parquet")
# Read the partitioned table
df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()
https://spark.apache.org/docs/1.3.1/sql-programming-guide.html
这是您要找的东西吗?
val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val extractFirstWord = udf((sentence: String) => sentence.split(" ").head)
val extractLastWord = udf((sentence: String) => sentence.split(" ").reverse.head)
val sentences = sc.parallelize(Seq("This is an example", "And this is another one", "One_word", "")).toDF("sentence")
val splits = sentences
.withColumn("first_word", extractFirstWord(col("sentence")))
.withColumn("last_word", extractLastWord(col("sentence")))
splits.show()
则输出为:
+--------------------+----------+---------+
| sentence|first_word|last_word|
+--------------------+----------+---------+
| This is an example| This| example|
|And this is anoth...| And| one|
| One_word| One_word| One_word|
| | | |
+--------------------+----------+---------+
我能想到一个只使用 withColumn():
val df = sc.dataFrame.withColumn('newcolname',{ lambda row: row + 1 } )
但我如何将其推广到文本数据?例如我的 DataFrame 有
strning 值说 "This is an example of a string" 我想提取
第一个和最后一个单词,如 val arraystring 中所示:Array[String] = Array(first,last)
# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
.map(lambda i: Row(single=i, double=i * 2)))
df1.save("data/test_table/key=1", "parquet")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i * 3)))
df2.save("data/test_table/key=2", "parquet")
# Read the partitioned table
df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()
https://spark.apache.org/docs/1.3.1/sql-programming-guide.html
这是您要找的东西吗?
val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val extractFirstWord = udf((sentence: String) => sentence.split(" ").head)
val extractLastWord = udf((sentence: String) => sentence.split(" ").reverse.head)
val sentences = sc.parallelize(Seq("This is an example", "And this is another one", "One_word", "")).toDF("sentence")
val splits = sentences
.withColumn("first_word", extractFirstWord(col("sentence")))
.withColumn("last_word", extractLastWord(col("sentence")))
splits.show()
则输出为:
+--------------------+----------+---------+
| sentence|first_word|last_word|
+--------------------+----------+---------+
| This is an example| This| example|
|And this is anoth...| And| one|
| One_word| One_word| One_word|
| | | |
+--------------------+----------+---------+