保留一列字符串 (Array[String, Int]) 中的特定字符并按组应用算法
Keep specific chars from a column of strings (Array[String, Int]) and apply algorithm per group
我有一个 csv 文件,其中包含 Z1:A、Z2:B 等以逗号分隔的值。
我想要:
1. 使用每个键的频率数创建键值(这部分已经完成)
2. 通过保留 Zx(其中 x 可以是任何整数)并丢弃冒号和后面的所有内容(:A 等)来重写我的数组(或者元组?)。
This 是我的虚拟文件(为简单起见,只有 Z1 和 Z2)。
所以我将我的 cvs 加载到一个 val
val example1 = sc.textFile("/Users/....../Spark_stack/example_1.csv")
然后我执行 map-reduce 以获得我想要的结果
val counts = example1.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_).collect
我不确定是否需要 .collect
,但这是从我的 "table".
调用整行或特定单元格的唯一方法
如果我打印那个
counts.foreach(println)
我得到:
scala> counts.foreach(println)
(Z1:C,5)
(Z1:E,3)
(Z1:A,10)
(Z2:B,2)
(Z2:A,2)
(Z1:D,4)
(Z2:C,1)
(Z1:B,24)
我想将其重写为:
(Z1,5)
(Z1,3)
(Z1,10)
(Z2,2)
(Z2,2)
(Z1,4)
(Z2,1)
(Z1,24)
一种可能的方法是使用 map
和 substring(0,2)
scala> counts.map(x => (x._1.substring(0,2),x._2))
res25: Array[(String, Int)] = Array((Z1,5), (Z1,3), (Z1,10), (Z2,2), (Z2,2), (Z1,4), (Z2,1), (Z1,24))
这里的问题是,我可能会得到一个点,因为 Z 的总数超过了 9,例如 Z15:A,甚至 Z123:D
因此我需要一些更动态的东西,它可以告诉 :
在哪里,substring
直到那个点。我的问题是不知道怎么写
如果我打电话:
scala> counts(1)._1.indexOfSlice(":")
res28: Int = 2
我得到了:
的职位,所以我可以这样申请:
scala> counts(1)._1.substring(0,counts(1)._1.indexOfSlice(":"))
res30: String = Z1
但我不知道如何将它应用于整个计数,而不仅仅是一行。我什至试过 foreach
但它没有用。
执行此操作后,我需要为每个单独的 Z1、Z2 等应用以下算法
以某种方式像这样以相反的顺序对其进行排序(虽然适用于单个 Zx,因此我需要按第 2 列 desc 和我的第一列进行排序)
val sorted = counts.sortBy(_._2).reverse
并为每个独特的 Zx 应用此
var h =0
for (i <- 0 to (sorted.length-1) ) { if ( sorted(i)._2 >= i+1) { h = i+1;}}
为了最终为每个 Zx 得到一个整数(上面 for 循环中的 var h)
对不起,如果它太复杂了,我是 scala-spark 的新手。
counts.map(x => (x._1.substring(0, x._1.indexOf(":")), x._2))
首先,绝对不要使用collect
。这将迫使您的所有数据返回给您的驱动程序,这将使机器不堪重负——除非您没有大量数据,在这种情况下您可以使用传统的 Scala 集合而不是 Spark。
所以让我们改用 DataFrame
API 和 functions
库:
import org.apache.spark.sql.functions._
sc.textFile("/Users/....../Spark_stack/example_1.csv")
.toDF("label","count")
.select(substring_index($"label", ":", 1).as("label"), $"count")
在这里,我将 RDD
转换为具有列 label
和 count
的 DataFrame
,然后使用库函数 substring_index
解析冒号前的内容。
如果您必须改用 RDD
s,那么您可以执行@sheunis 建议的操作(尽管在 RDD
而不是 collect
的结果上)或这样做:
sc.textFile("/Users/....../Spark_stack/example_1.csv").map {
case (label, count) => (label.split(":").head, count)
}
我有一个 csv 文件,其中包含 Z1:A、Z2:B 等以逗号分隔的值。 我想要: 1. 使用每个键的频率数创建键值(这部分已经完成) 2. 通过保留 Zx(其中 x 可以是任何整数)并丢弃冒号和后面的所有内容(:A 等)来重写我的数组(或者元组?)。
This 是我的虚拟文件(为简单起见,只有 Z1 和 Z2)。
所以我将我的 cvs 加载到一个 val
val example1 = sc.textFile("/Users/....../Spark_stack/example_1.csv")
然后我执行 map-reduce 以获得我想要的结果
val counts = example1.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_).collect
我不确定是否需要 .collect
,但这是从我的 "table".
如果我打印那个
counts.foreach(println)
我得到:
scala> counts.foreach(println)
(Z1:C,5)
(Z1:E,3)
(Z1:A,10)
(Z2:B,2)
(Z2:A,2)
(Z1:D,4)
(Z2:C,1)
(Z1:B,24)
我想将其重写为:
(Z1,5)
(Z1,3)
(Z1,10)
(Z2,2)
(Z2,2)
(Z1,4)
(Z2,1)
(Z1,24)
一种可能的方法是使用 map
和 substring(0,2)
scala> counts.map(x => (x._1.substring(0,2),x._2))
res25: Array[(String, Int)] = Array((Z1,5), (Z1,3), (Z1,10), (Z2,2), (Z2,2), (Z1,4), (Z2,1), (Z1,24))
这里的问题是,我可能会得到一个点,因为 Z 的总数超过了 9,例如 Z15:A,甚至 Z123:D
因此我需要一些更动态的东西,它可以告诉 :
在哪里,substring
直到那个点。我的问题是不知道怎么写
如果我打电话:
scala> counts(1)._1.indexOfSlice(":")
res28: Int = 2
我得到了:
的职位,所以我可以这样申请:
scala> counts(1)._1.substring(0,counts(1)._1.indexOfSlice(":"))
res30: String = Z1
但我不知道如何将它应用于整个计数,而不仅仅是一行。我什至试过 foreach
但它没有用。
执行此操作后,我需要为每个单独的 Z1、Z2 等应用以下算法
以某种方式像这样以相反的顺序对其进行排序(虽然适用于单个 Zx,因此我需要按第 2 列 desc 和我的第一列进行排序)
val sorted = counts.sortBy(_._2).reverse
并为每个独特的 Zx 应用此
var h =0
for (i <- 0 to (sorted.length-1) ) { if ( sorted(i)._2 >= i+1) { h = i+1;}}
为了最终为每个 Zx 得到一个整数(上面 for 循环中的 var h)
对不起,如果它太复杂了,我是 scala-spark 的新手。
counts.map(x => (x._1.substring(0, x._1.indexOf(":")), x._2))
首先,绝对不要使用collect
。这将迫使您的所有数据返回给您的驱动程序,这将使机器不堪重负——除非您没有大量数据,在这种情况下您可以使用传统的 Scala 集合而不是 Spark。
所以让我们改用 DataFrame
API 和 functions
库:
import org.apache.spark.sql.functions._
sc.textFile("/Users/....../Spark_stack/example_1.csv")
.toDF("label","count")
.select(substring_index($"label", ":", 1).as("label"), $"count")
在这里,我将 RDD
转换为具有列 label
和 count
的 DataFrame
,然后使用库函数 substring_index
解析冒号前的内容。
如果您必须改用 RDD
s,那么您可以执行@sheunis 建议的操作(尽管在 RDD
而不是 collect
的结果上)或这样做:
sc.textFile("/Users/....../Spark_stack/example_1.csv").map {
case (label, count) => (label.split(":").head, count)
}