在 Java 中应用 Map Reduce
Apply Map Reduce in Java
我是流的初学者,map
、reduce
和 filter
。
我正在从我的 Cassandra table 中获取行列表,其中包含三个字段 vehicleType
、noOfVehicles
和 taxPerParticularVehicleType
.
我想准备一组这 3 个三元组,以便添加任何特定类型的车辆数量,同时三元组还应包含特定车辆类型的税收的算术平均值。
我正在应用我的映射,例如:
session.execute(statement).all().stream()
.map(row -> new ImmutablePair<>(row.getString("vehicleType"), new ImmutablePair<>(row.getInt("noOfVehicles"), row.getFloat("tollTaxOfParticularType") * row.getInt("noOfVehicles"))))
.reduce(x->{
});
而且我无法应用 reduce 以将其添加到以下集合中:
Set<Triple<String,Integer,Double>> set = new HashSet<>();
我正在举例说明我想通过 Map-Reduce 实现的目标:
我正在映射来自我的 table 的三个字段(vehicleType、noOfVehicle、taxOfParticularVehicle),例如:
(vehicleType,(noOfVehicle,noOfVehicle*taxOfParticularVehicle))
假设映射给我一个这样的数组:
[("A",(12,48)),("A",(10,30)),("B",(3,30)),("B",(4,70))]
最后我想把它缩减为以下一组:
[("A",22,39),("B",7,50)]
这样就不会对 noOfVehicles 求和,而税是组中车辆税的算术平均值。
如果不流式传输不止一次或在外部维护可变状态,这有点棘手。这些方法最简洁的替代方法似乎是编写自定义 Collector
.
我对 Pair
、Triple
不太满意,所以我使用具体的 classes 来说明:
Data
是单个数据点的持有者,对应于您的数据三元组。
static final class Data {
final String type;
final int noOfVehicles;
final double totalTax;
Data(String type, int noOfVehicles, double totalTax) {
this.type = type;
this.noOfVehicles = noOfVehicles;
this.totalTax = totalTax;
}
}
接下来,我们需要一个助手 class 来保存可变缩减期间的状态,我称之为 Stats
:
static final class Stats {
int noOfVehiclesSum;
double totalTaxSum;
int count;
@Override
public String toString() {
return "Stats{" + "noOfVehiclesSum=" + noOfVehiclesSum +
", averageTax=" + (totalTaxSum / count) + '}';
}
}
让我们创建一个测试数据列表
List<Data> l = Arrays.asList(new Data("A", 12, 48.0),
new Data("A", 10, 30.0),
new Data("B", 3 , 30.0),
new Data("B", 4 , 70.0),
new Data("B", 5 , 20.0));
作为减少的最终结果,我想要的是一个 Map<String, Stats>
,其中包含从 vehicleType 到该类型的 Stats
对象的映射(包含车辆数量和税收的总和该类型的平均值)。
在这个例子中:{A=Stats{noOfVehiclesSum=22, averageTax=39.0}, B=Stats{noOfVehiclesSum=12, averageTax=40.0}}
我不知道有什么比编写您自己的自定义 Collector
更好的解决方案了,在此示例中,它看起来有点像以下内容:
static class StatsCollector implements Collector<Data, Stats, Stats> {
@Override
public Supplier<Stats> supplier() {
return Stats::new;
}
@Override
public BiConsumer<Stats, Data> accumulator() {
return (stats, data) -> {
stats.noOfVehiclesSum += data.noOfVehicles;
stats.totalTaxSum += data.totalTax;
stats.count += 1;
};
}
@Override
public BinaryOperator<Stats> combiner() {
return (lft, rght) -> {
lft.noOfVehiclesSum += rght.noOfVehiclesSum;
lft.totalTaxSum += rght.totalTaxSum;
lft.count += rght.count;
return lft;
};
}
@Override
public Function<Stats, Stats> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Collector.Characteristics.IDENTITY_FINISH);
}
}
最后,在所有这些管道之后,您将能够编写
Map<String, Stats> result = l.stream()
.collect(Collectors.groupingBy(data -> data.type,
new StatsCollector()));
并获得所需的映射。
我是流的初学者,map
、reduce
和 filter
。
我正在从我的 Cassandra table 中获取行列表,其中包含三个字段 vehicleType
、noOfVehicles
和 taxPerParticularVehicleType
.
我想准备一组这 3 个三元组,以便添加任何特定类型的车辆数量,同时三元组还应包含特定车辆类型的税收的算术平均值。
我正在应用我的映射,例如:
session.execute(statement).all().stream()
.map(row -> new ImmutablePair<>(row.getString("vehicleType"), new ImmutablePair<>(row.getInt("noOfVehicles"), row.getFloat("tollTaxOfParticularType") * row.getInt("noOfVehicles"))))
.reduce(x->{
});
而且我无法应用 reduce 以将其添加到以下集合中:
Set<Triple<String,Integer,Double>> set = new HashSet<>();
我正在举例说明我想通过 Map-Reduce 实现的目标:
我正在映射来自我的 table 的三个字段(vehicleType、noOfVehicle、taxOfParticularVehicle),例如:
(vehicleType,(noOfVehicle,noOfVehicle*taxOfParticularVehicle))
假设映射给我一个这样的数组:
[("A",(12,48)),("A",(10,30)),("B",(3,30)),("B",(4,70))]
最后我想把它缩减为以下一组:
[("A",22,39),("B",7,50)]
这样就不会对 noOfVehicles 求和,而税是组中车辆税的算术平均值。
如果不流式传输不止一次或在外部维护可变状态,这有点棘手。这些方法最简洁的替代方法似乎是编写自定义 Collector
.
我对 Pair
、Triple
不太满意,所以我使用具体的 classes 来说明:
Data
是单个数据点的持有者,对应于您的数据三元组。
static final class Data {
final String type;
final int noOfVehicles;
final double totalTax;
Data(String type, int noOfVehicles, double totalTax) {
this.type = type;
this.noOfVehicles = noOfVehicles;
this.totalTax = totalTax;
}
}
接下来,我们需要一个助手 class 来保存可变缩减期间的状态,我称之为 Stats
:
static final class Stats {
int noOfVehiclesSum;
double totalTaxSum;
int count;
@Override
public String toString() {
return "Stats{" + "noOfVehiclesSum=" + noOfVehiclesSum +
", averageTax=" + (totalTaxSum / count) + '}';
}
}
让我们创建一个测试数据列表
List<Data> l = Arrays.asList(new Data("A", 12, 48.0),
new Data("A", 10, 30.0),
new Data("B", 3 , 30.0),
new Data("B", 4 , 70.0),
new Data("B", 5 , 20.0));
作为减少的最终结果,我想要的是一个 Map<String, Stats>
,其中包含从 vehicleType 到该类型的 Stats
对象的映射(包含车辆数量和税收的总和该类型的平均值)。
在这个例子中:{A=Stats{noOfVehiclesSum=22, averageTax=39.0}, B=Stats{noOfVehiclesSum=12, averageTax=40.0}}
我不知道有什么比编写您自己的自定义 Collector
更好的解决方案了,在此示例中,它看起来有点像以下内容:
static class StatsCollector implements Collector<Data, Stats, Stats> {
@Override
public Supplier<Stats> supplier() {
return Stats::new;
}
@Override
public BiConsumer<Stats, Data> accumulator() {
return (stats, data) -> {
stats.noOfVehiclesSum += data.noOfVehicles;
stats.totalTaxSum += data.totalTax;
stats.count += 1;
};
}
@Override
public BinaryOperator<Stats> combiner() {
return (lft, rght) -> {
lft.noOfVehiclesSum += rght.noOfVehiclesSum;
lft.totalTaxSum += rght.totalTaxSum;
lft.count += rght.count;
return lft;
};
}
@Override
public Function<Stats, Stats> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Collector.Characteristics.IDENTITY_FINISH);
}
}
最后,在所有这些管道之后,您将能够编写
Map<String, Stats> result = l.stream()
.collect(Collectors.groupingBy(data -> data.type,
new StatsCollector()));
并获得所需的映射。