GridGain:具有节点本地数据处理的 MapReduce?
GridGain: MapReduce with node-local data processing?
我正在尝试对大型分布式数据集执行一些数值计算。这些算法非常适合 MapReduce 模型,另外 属性 映射步骤的输出与输入数据相比较小。数据可以被认为是只读的,并且静态分布在节点上(故障转移时的重新平衡除外)。请注意,这与将输入数据发送到执行映射步骤的节点的标准字数统计示例有些相反。
这意味着 map 步骤应在所有节点上并行执行,处理每个节点的本地数据,同时将 map 步骤的输出发送到一个节点以进行 reduce 步骤是可以接受的。
用 GridGain 实现这个的最佳方法是什么?
似乎在早期版本的GridGain中GridCache
/GridCacheProjection
接口上有一个reduce
(..)方法,但现在已经不存在了。有替代品吗?我正在考虑一种机制,它采用地图闭包并在每个数据 上恰好执行一次 ,同时避免通过网络复制任何输入数据。
到目前为止我想出的(有点手动的)方法如下:
public class GridBroadcastCountDemo {
public static void main(String[] args) throws GridException {
try (Grid grid = GridGain.start(CONFIG_FILE)) {
GridFuture<Collection<Integer>> future = grid.forRemotes().compute().broadcast(new GridCallable<Integer>() {
@Override
public Integer call() throws Exception {
GridCache<Integer, float[]> cache = grid.cache(CACHE_NAME);
int count = 0;
for (float[] array : cache.primaryValues()) {
count += array.length;
}
return count;
}
});
int totalCount = 0;
for (int count : future.get()) {
totalCount += count;
}
// expect size of input data
System.out.println(totalCount);
}
}
}
但是,不能保证使用这种方法每个数据都只处理一次。例如。当在执行 GridCallable
时发生重新平衡时,部分数据可能会被处理零次或多次。
GridGain Open Source(现在是 Apache Ignite)有 ComputeTask API,它有 map() 和 reduce() 方法。如果您正在寻找 reduce() 方法,那么 ComputeTask 绝对是适合您的 API。
目前您的实施没有问题。 Apache Ignite 正在添加一项功能,在该功能中,在迁移完全完成之前,节点不会被视为主要节点。应该快了。
我正在尝试对大型分布式数据集执行一些数值计算。这些算法非常适合 MapReduce 模型,另外 属性 映射步骤的输出与输入数据相比较小。数据可以被认为是只读的,并且静态分布在节点上(故障转移时的重新平衡除外)。请注意,这与将输入数据发送到执行映射步骤的节点的标准字数统计示例有些相反。
这意味着 map 步骤应在所有节点上并行执行,处理每个节点的本地数据,同时将 map 步骤的输出发送到一个节点以进行 reduce 步骤是可以接受的。
用 GridGain 实现这个的最佳方法是什么?
似乎在早期版本的GridGain中GridCache
/GridCacheProjection
接口上有一个reduce
(..)方法,但现在已经不存在了。有替代品吗?我正在考虑一种机制,它采用地图闭包并在每个数据 上恰好执行一次 ,同时避免通过网络复制任何输入数据。
到目前为止我想出的(有点手动的)方法如下:
public class GridBroadcastCountDemo {
public static void main(String[] args) throws GridException {
try (Grid grid = GridGain.start(CONFIG_FILE)) {
GridFuture<Collection<Integer>> future = grid.forRemotes().compute().broadcast(new GridCallable<Integer>() {
@Override
public Integer call() throws Exception {
GridCache<Integer, float[]> cache = grid.cache(CACHE_NAME);
int count = 0;
for (float[] array : cache.primaryValues()) {
count += array.length;
}
return count;
}
});
int totalCount = 0;
for (int count : future.get()) {
totalCount += count;
}
// expect size of input data
System.out.println(totalCount);
}
}
}
但是,不能保证使用这种方法每个数据都只处理一次。例如。当在执行 GridCallable
时发生重新平衡时,部分数据可能会被处理零次或多次。
GridGain Open Source(现在是 Apache Ignite)有 ComputeTask API,它有 map() 和 reduce() 方法。如果您正在寻找 reduce() 方法,那么 ComputeTask 绝对是适合您的 API。
目前您的实施没有问题。 Apache Ignite 正在添加一项功能,在该功能中,在迁移完全完成之前,节点不会被视为主要节点。应该快了。