保留一列字符串 (Array[String, Int]) 中的特定字符并按组应用算法

Keep specific chars from a column of strings (Array[String, Int]) and apply algorithm per group

我有一个 csv 文件,其中包含 Z1:A、Z2:B 等以逗号分隔的值。 我想要: 1. 使用每个键的频率数创建键值(这部分已经完成) 2. 通过保留 Zx(其中 x 可以是任何整数)并丢弃冒号和后面的所有内容(:A 等)来重写我的数组(或者元组?)。

This 是我的虚拟文件(为简单起见,只有 Z1 和 Z2)。

所以我将我的 cvs 加载到一个 val

val example1 = sc.textFile("/Users/....../Spark_stack/example_1.csv")

然后我执行 map-reduce 以获得我想要的结果

val counts = example1.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_).collect

我不确定是否需要 .collect,但这是从我的 "table".

调用整行或特定单元格的唯一方法

如果我打印那个

counts.foreach(println)

我得到:

scala> counts.foreach(println)
(Z1:C,5)
(Z1:E,3)
(Z1:A,10)
(Z2:B,2)
(Z2:A,2)
(Z1:D,4)
(Z2:C,1)
(Z1:B,24)

我想将其重写为:

(Z1,5)
(Z1,3)
(Z1,10)
(Z2,2)
(Z2,2)
(Z1,4)
(Z2,1)
(Z1,24)

一种可能的方法是使用 mapsubstring(0,2)

scala> counts.map(x => (x._1.substring(0,2),x._2))
res25: Array[(String, Int)] = Array((Z1,5), (Z1,3), (Z1,10), (Z2,2), (Z2,2), (Z1,4), (Z2,1), (Z1,24))

这里的问题是,我可能会得到一个点,因为 Z 的总数超过了 9,例如 Z15:A,甚至 Z123:D

因此我需要一些更动态的东西,它可以告诉 : 在哪里,substring 直到那个点。我的问题是不知道怎么写

如果我打电话:

scala> counts(1)._1.indexOfSlice(":")
res28: Int = 2

我得到了:的职位,所以我可以这样申请:

scala> counts(1)._1.substring(0,counts(1)._1.indexOfSlice(":"))
res30: String = Z1

但我不知道如何将它应用于整个计数,而不仅仅是一行。我什至试过 foreach 但它没有用。

执行此操作后,我需要为每个单独的 Z1、Z2 等应用以下算法

以某种方式像这样以相反的顺序对其进行排序(虽然适用于单个 Zx,因此我需要按第 2 列 desc 和我的第一列进行排序)

val sorted = counts.sortBy(_._2).reverse

并为每个独特的 Zx 应用此

var h =0
for (i <- 0 to (sorted.length-1) ) { if ( sorted(i)._2 >= i+1) { h = i+1;}}

为了最终为每个 Zx 得到一个整数(上面 for 循环中的 var h)

对不起,如果它太复杂了,我是 scala-spark 的新手。

counts.map(x => (x._1.substring(0, x._1.indexOf(":")), x._2))

首先,绝对不要使用collect。这将迫使您的所有数据返回给您的驱动程序,这将使机器不堪重负——除非您没有大量数据,在这种情况下您可以使用传统的 Scala 集合而不是 Spark。

所以让我们改用 DataFrame API 和 functions 库:

import org.apache.spark.sql.functions._

sc.textFile("/Users/....../Spark_stack/example_1.csv")
  .toDF("label","count")
  .select(substring_index($"label", ":", 1).as("label"), $"count")

在这里,我将 RDD 转换为具有列 labelcountDataFrame,然后使用库函数 substring_index 解析冒号前的内容。

如果您必须改用 RDDs,那么您可以执行@sheunis 建议的操作(尽管在 RDD 而不是 collect 的结果上)或这样做:

sc.textFile("/Users/....../Spark_stack/example_1.csv").map {
    case (label, count) => (label.split(":").head, count)
}