使用Apache Spark RDD map方法(Java API)产生非柱状结果
Using the Apache Spark RDD map method (Java API) to produce a non-columnar result
请注意:我相信我在这里尝试使用RDD map
方法是正确的,但是如果还有另一种方法可以完成我正在寻找的东西,我洗耳恭听!
全新的 Spark 2。4.x 这里,并使用 Java(不是 Scala) API.
我正在努力思考 RDD map(...)
方法,特别是 Datasets
而不仅限于 RDD。 official docs 中使用它的典型例子是:
public class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
因此,在这种情况下,在创建 lines
RDD 之后,它似乎只有一个列(我不确定其名称),其中每个列值都是不同的行文件,并且 RDD 中的每一行也代表文件的不同行。意思是,lines
是一个 nx1
矩阵,其中 n
是文件中 rows/lines 的数量。
似乎在执行 GetLength
函数时,它被输入每一行的唯一列作为输入字符串,并且 returns 是一个表示该行长度的整数字符串作为不同数据集中的新列值,它也是 nx1
(只是保存行长度信息而不是实际的 lines/strings)。
好的,我得到了那个简单的例子。但是,如果我们有 nxm
个数据集,也就是说,很多行和很多列,并且我们想编写将它们转换为其他 nxm
个数据集的函数怎么办?
例如,假设我有以下 "input" 数据集:
+-------------------------+
| price | color | fizz |
+-------------------------+
| 2.99 | red | hallo |
| 13.94 | blue | yahtzee |
| 7.19 | red | trueth |
...
| 4.15 | green | whatevs |
+-------------------------+
其中 price
是 numeric/floating-point 类型,color
和 fizz
都是字符串。所以这里我们有一个 nx3
形状的数据集; n
行,每行总是 3 列。
我如何编写一个地图函数,它也 returned 一个 nx3
数据集,具有相同的 columns/column names/schema,但不同的值(基于函数)?
例如,假设我想要一个具有相同架构的新 nx3
数据集,但如果该行的 color
值等于 price
列,则添加 2.0
字符串 "red"
?
因此,使用上面的任意数据集,这个映射函数产生的新数据集看起来像:
+-------------------------+
| price | color | fizz |
+-------------------------+
| 4.99 | red | hallo | <== added 2.0 to price since color == "red"
| 13.94 | blue | yahtzee |
| 9.19 | red | trueth | <== added 2.0 to price since color == "red"
...
| 4.15 | green | whatevs |
+-------------------------+
我很想做这样的事情:
public class UpdatedPriceForRedColors implements Function2<String, Double, Double> {
public Double call(String color, Double currentPrice) {
if ("red".equals(color) {
return currentPrice + 2.0;
} else {
return currentPrice;
}
}
}
JavaRDD<Double> updatedPrices = myDS.map(new UpdatedPriceForRedColors());
但是,这里有几个问题:
updatedPrices
现在只是一个 nx1
数据集,由 myDS
中每一行的正确计算价格组成,而我想要具有相同 price/color/fizz
的东西看起来像上面的第二个任意数据集
UpdatedPriceForRedColors
如何知道它的第一个字符串参数是 color
列,而 而不是 fizz
列?
- 函数 API 似乎只能达到
Function5
或 Function6
(很难辨别 Java API 可以使用什么以及 Scala 独有的内容 API)。这意味着我只能编写接受 5 或 6 个参数的函数,而我可能有包含 10 多个列的数据集,我可能非常需要将这些列值 "injected" 中的大部分放入函数中,以便我可以计算新数据集的 return 值。在这种情况下我有哪些选择?
首先,RDD 总是有一个 列 ,因为 RDD 没有模式信息,因此你被绑定到 [=15] 中的 T
类型=].
选项 1 是使用 Function<String,String>
解析 RDD<String>
中的 String
,执行逻辑来操作中的内部元素字符串,returns 一个更新的字符串。
如果您希望 RDD
包含一些模式信息,您可以使用 RDD<Row>
来访问 Row
中的单独元素(选项 2 ).
import org.apache.spark.sql.Row
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] parts = line.split("\t");//tab separated values
Row row = RowFactory.create(parts[0], parts[1], parts[2]);
return row;
}
});
现在您可以映射行:
RDD<Row> updatedRdd = rddRow.map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
Float price = row.get(0);
String color = row.get(1);
//Your logic here
Row row = RowFactory.create(/* three elements here, or whatever*/);
return row;
}
});
如果更进一步,您可以使用真正的数据集(如 所述)并利用 Dataframe/Dataset API (选项 3).
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StructType schema = DataTypes.createStructType(
new StructField[]{
DataTypes.createStructField("price", FloatType, false),
DataTypes.createStructField("color", StringType, false),
DataTypes.createStructField("fizz", StringType, false)
});
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] parts = line.split("\t");//tab separated values
Row row = RowFactory.create(parts[0], parts[1], parts[2]);
return row;
}
});
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
有了数据框,您现在可以使用这样的东西:
DataFrame df2 = df.withColumn("price",
when(col("color").equals("red"), col("price").add(2f))
.otherwise(col("price")));
免责声明:我没有检查 java 语法和 API 因为我习惯了 scala。
请注意:我相信我在这里尝试使用RDD map
方法是正确的,但是如果还有另一种方法可以完成我正在寻找的东西,我洗耳恭听!
全新的 Spark 2。4.x 这里,并使用 Java(不是 Scala) API.
我正在努力思考 RDD map(...)
方法,特别是 Datasets
而不仅限于 RDD。 official docs 中使用它的典型例子是:
public class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
因此,在这种情况下,在创建 lines
RDD 之后,它似乎只有一个列(我不确定其名称),其中每个列值都是不同的行文件,并且 RDD 中的每一行也代表文件的不同行。意思是,lines
是一个 nx1
矩阵,其中 n
是文件中 rows/lines 的数量。
似乎在执行 GetLength
函数时,它被输入每一行的唯一列作为输入字符串,并且 returns 是一个表示该行长度的整数字符串作为不同数据集中的新列值,它也是 nx1
(只是保存行长度信息而不是实际的 lines/strings)。
好的,我得到了那个简单的例子。但是,如果我们有 nxm
个数据集,也就是说,很多行和很多列,并且我们想编写将它们转换为其他 nxm
个数据集的函数怎么办?
例如,假设我有以下 "input" 数据集:
+-------------------------+
| price | color | fizz |
+-------------------------+
| 2.99 | red | hallo |
| 13.94 | blue | yahtzee |
| 7.19 | red | trueth |
...
| 4.15 | green | whatevs |
+-------------------------+
其中 price
是 numeric/floating-point 类型,color
和 fizz
都是字符串。所以这里我们有一个 nx3
形状的数据集; n
行,每行总是 3 列。
我如何编写一个地图函数,它也 returned 一个 nx3
数据集,具有相同的 columns/column names/schema,但不同的值(基于函数)?
例如,假设我想要一个具有相同架构的新 nx3
数据集,但如果该行的 color
值等于 price
列,则添加 2.0
字符串 "red"
?
因此,使用上面的任意数据集,这个映射函数产生的新数据集看起来像:
+-------------------------+
| price | color | fizz |
+-------------------------+
| 4.99 | red | hallo | <== added 2.0 to price since color == "red"
| 13.94 | blue | yahtzee |
| 9.19 | red | trueth | <== added 2.0 to price since color == "red"
...
| 4.15 | green | whatevs |
+-------------------------+
我很想做这样的事情:
public class UpdatedPriceForRedColors implements Function2<String, Double, Double> {
public Double call(String color, Double currentPrice) {
if ("red".equals(color) {
return currentPrice + 2.0;
} else {
return currentPrice;
}
}
}
JavaRDD<Double> updatedPrices = myDS.map(new UpdatedPriceForRedColors());
但是,这里有几个问题:
updatedPrices
现在只是一个nx1
数据集,由myDS
中每一行的正确计算价格组成,而我想要具有相同price/color/fizz
的东西看起来像上面的第二个任意数据集UpdatedPriceForRedColors
如何知道它的第一个字符串参数是color
列,而 而不是fizz
列?- 函数 API 似乎只能达到
Function5
或Function6
(很难辨别 Java API 可以使用什么以及 Scala 独有的内容 API)。这意味着我只能编写接受 5 或 6 个参数的函数,而我可能有包含 10 多个列的数据集,我可能非常需要将这些列值 "injected" 中的大部分放入函数中,以便我可以计算新数据集的 return 值。在这种情况下我有哪些选择?
首先,RDD 总是有一个 列 ,因为 RDD 没有模式信息,因此你被绑定到 [=15] 中的 T
类型=].
选项 1 是使用 Function<String,String>
解析 RDD<String>
中的 String
,执行逻辑来操作中的内部元素字符串,returns 一个更新的字符串。
如果您希望 RDD
包含一些模式信息,您可以使用 RDD<Row>
来访问 Row
中的单独元素(选项 2 ).
import org.apache.spark.sql.Row
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] parts = line.split("\t");//tab separated values
Row row = RowFactory.create(parts[0], parts[1], parts[2]);
return row;
}
});
现在您可以映射行:
RDD<Row> updatedRdd = rddRow.map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
Float price = row.get(0);
String color = row.get(1);
//Your logic here
Row row = RowFactory.create(/* three elements here, or whatever*/);
return row;
}
});
如果更进一步,您可以使用真正的数据集(如
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StructType schema = DataTypes.createStructType(
new StructField[]{
DataTypes.createStructField("price", FloatType, false),
DataTypes.createStructField("color", StringType, false),
DataTypes.createStructField("fizz", StringType, false)
});
JavaRDD<Row> rddRow = rddString.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] parts = line.split("\t");//tab separated values
Row row = RowFactory.create(parts[0], parts[1], parts[2]);
return row;
}
});
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
有了数据框,您现在可以使用这样的东西:
DataFrame df2 = df.withColumn("price",
when(col("color").equals("red"), col("price").add(2f))
.otherwise(col("price")));
免责声明:我没有检查 java 语法和 API 因为我习惯了 scala。