使用Apache Spark RDD map方法(Java API)产生非柱状结果

Using the Apache Spark RDD map method (Java API) to produce a non-columnar result

请注意:相信我在这里尝试使用RDD map方法是正确的,但是如果还有另一种方法可以完成我正在寻找的东西,我洗耳恭听!


全新的 Spark 2。4.x 这里,并使用 Java不是 Scala) API.

我正在努力思考 RDD map(...) 方法,特别是 Datasets 而不仅限于 RDD。 official docs 中使用它的典型例子是:

public class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());

因此,在这种情况下,在创建 lines RDD 之后,它似乎只有一个列(我不确定其名称),其中每个列值都是不同的行文件,并且 RDD 中的每一行也代表文件的不同行。意思是,lines 是一个 nx1 矩阵,其中 n 是文件中 rows/lines 的数量。

似乎在执行 GetLength 函数时,它被输入每一行的唯一列作为输入字符串,并且 returns 是一个表示该行长度的整数字符串作为不同数据集中的新列值,它也是 nx1(只是保存行长度信息而不是实际的 lines/strings)。

好的,我得到了那个简单的例子。但是,如果我们有 nxm 个数据集,也就是说,很多行和很多列,并且我们想编写将它们转换为其他 nxm 个数据集的函数怎么办?

例如,假设我有以下 "input" 数据集:

+-------------------------+
| price | color | fizz    |
+-------------------------+
| 2.99  | red   | hallo   |
| 13.94 | blue  | yahtzee |
| 7.19  | red   | trueth  |
...
| 4.15  | green | whatevs |
+-------------------------+

其中 price 是 numeric/floating-point 类型,colorfizz 都是字符串。所以这里我们有一个 nx3 形状的数据集; n 行,每行总是 3 列。

我如何编写一个地图函数,它也 returned 一个 nx3 数据集,具有相同的 columns/column names/schema,但不同的值(基于函数)?

例如,假设我想要一个具有相同架构的新 nx3 数据集,但如果该行的 color 值等于 price 列,则添加 2.0字符串 "red"?

因此,使用上面的任意数据集,这个映射函数产生的新数据集看起来像:

+-------------------------+
| price | color | fizz    |
+-------------------------+
| 4.99  | red   | hallo   |  <== added 2.0 to price since color == "red"
| 13.94 | blue  | yahtzee |
| 9.19  | red   | trueth  |  <== added 2.0 to price since color == "red"
...
| 4.15  | green | whatevs |
+-------------------------+

很想做这样的事情:

public class UpdatedPriceForRedColors implements Function2<String, Double, Double> {
  public Double call(String color, Double currentPrice) {

    if ("red".equals(color) {
        return currentPrice + 2.0;
    } else {
        return currentPrice;
    }
  }
}

JavaRDD<Double> updatedPrices = myDS.map(new UpdatedPriceForRedColors());

但是,这里有几个问题:

  1. updatedPrices 现在只是一个 nx1 数据集,由 myDS 中每一行的正确计算价格组成,而我想要具有相同 price/color/fizz 的东西看起来像上面的第二个任意数据集
  2. UpdatedPriceForRedColors 如何知道它的第一个字符串参数是 color 列,而 而不是 fizz 列?
  3. 函数 API 似乎只能达到 Function5Function6(很难辨别 Java API 可以使用什么以及 Scala 独有的内容 API)。这意味着我只能编写接受 5 或 6 个参数的函数,而我可能有包含 10 多个列的数据集,我可能非常需要将这些列值 "injected" 中的大部分放入函数中,以便我可以计算新数据集的 return 值。在这种情况下我有哪些选择?

首先,RDD 总是有一个 ,因为 RDD 没有模式信息,因此你被绑定到 [=15] 中的 T 类型=].

选项 1 是使用 Function<String,String> 解析 RDD<String> 中的 String,执行逻辑来操作中的内部元素字符串,returns 一个更新的字符串。

如果您希望 RDD 包含一些模式信息,您可以使用 RDD<Row> 来访问 Row 中的单独元素(选项 2 ).

import org.apache.spark.sql.Row
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
    @Override
    public Row call(String line) throws Exception {
      String[] parts = line.split("\t");//tab separated values
      Row row = RowFactory.create(parts[0], parts[1], parts[2]);
      return row;
    }
  });

现在您可以映射行:

RDD<Row> updatedRdd = rddRow.map(new Function<Row, Row>() {
    @Override
    public Row call(Row row) throws Exception {
      Float price = row.get(0);
      String color = row.get(1);
      //Your logic here          
      Row row = RowFactory.create(/* three elements here, or whatever*/);
      return row;
    }
  });

如果更进一步,您可以使用真正的数据集(如 所述)并利用 Dataframe/Dataset API (选项 3).

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StructType schema = DataTypes.createStructType(
    new StructField[]{
            DataTypes.createStructField("price", FloatType, false),
            DataTypes.createStructField("color", StringType, false),
            DataTypes.createStructField("fizz", StringType, false)
    });


JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
    @Override
    public Row call(String line) throws Exception {
      String[] parts = line.split("\t");//tab separated values
      Row row = RowFactory.create(parts[0], parts[1], parts[2]);
      return row;
    }
  });

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

有了数据框,您现在可以使用这样的东西:

DataFrame df2 = df.withColumn("price", 
    when(col("color").equals("red"), col("price").add(2f))
        .otherwise(col("price")));

免责声明:我没有检查 java 语法和 API 因为我习惯了 scala。