Spark RDD 映射 1 到多
Spark RDD map 1 to many
我是 spark 的新手,遇到了问题。我正在处理一个用 textFile() 生成的 RDD,它是一个 csv 文件。对于每一行,我想 return 多行到一个新的 RDD(一个而不是多个)。这是我的代码:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
我在这里做的是过滤初始csv以仅获取LinearAccelerationEvent,然后我想将这些对象映射到LinearAccelerationEvent class并生成一个新的LinearAccelerationEvent对象的RDD。对于初始 csv 文件的每一行,我必须生成多个 LinearAccelerometerEvent 对象,但我不知道该怎么做。之所以要这么做,是因为后面这个RDD会像这样push到cassandra:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
所以理想的解决方案应该是这样的:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
我可以使用 foreachPartition()
函数并将 for 循环的每个事件推送到 Cassandra,但我发现这种方法要慢得多。是否可以不让用户 foreach 做我想做的事?谢谢
如果我没理解错的话,return LinearAccelerationEvent
的集合(例如 List)并调用 flatMap
而不是 map
。这将为每个加速事件在结果 RDD 中生成一个值。
flatMap 与调用 map 后跟 flatten 相同。如果您熟悉 Hive,它类似于使用 HiveQL 中可用的 explode DTF。
我是 spark 的新手,遇到了问题。我正在处理一个用 textFile() 生成的 RDD,它是一个 csv 文件。对于每一行,我想 return 多行到一个新的 RDD(一个而不是多个)。这是我的代码:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
我在这里做的是过滤初始csv以仅获取LinearAccelerationEvent,然后我想将这些对象映射到LinearAccelerationEvent class并生成一个新的LinearAccelerationEvent对象的RDD。对于初始 csv 文件的每一行,我必须生成多个 LinearAccelerometerEvent 对象,但我不知道该怎么做。之所以要这么做,是因为后面这个RDD会像这样push到cassandra:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
所以理想的解决方案应该是这样的:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
我可以使用 foreachPartition()
函数并将 for 循环的每个事件推送到 Cassandra,但我发现这种方法要慢得多。是否可以不让用户 foreach 做我想做的事?谢谢
如果我没理解错的话,return LinearAccelerationEvent
的集合(例如 List)并调用 flatMap
而不是 map
。这将为每个加速事件在结果 RDD 中生成一个值。
flatMap 与调用 map 后跟 flatten 相同。如果您熟悉 Hive,它类似于使用 HiveQL 中可用的 explode DTF。