OpenMPI Java 锁定、累积、获取的绑定行为差异
OpenMPI Java Bindings Behavior Difference for lock, accumulate, get
我有一个案例需要在我们的 MPI 研究集群上使用 Java。 this question 中很好地介绍了我需要的一个特定功能(链接答案中包含 C++ 代码)。我构建了 C++ 代码,它完全按预期工作。
我试图构建与此代码等效的 Java,但失败得很惨。尽管在功能上我已经复制了 C++ 代码的功能,Java 版本并没有始终如一地 return 期望的结果。
mpiexec --oversubscribe -n 4 ./test
0 got counter 1
2 got counter 2
1 got counter 3
3 got counter 4
1 1 1 1
(运行 -- 在我的本地笔记本电脑上过度订阅。)
当我 运行 我的 Java 等效时,我没有得到接近相同结果的任何地方:
mpirun --oversubscribe -n 4 java -cp .:/usr/local/lib/openmpi/mpi.jar CounterTest
0 got counter 1
3 got counter 1
1 got counter 3
2 got counter 2
1 1 1 1
我希望每个等级都有一个且只有一个指示物。这个运行,计数器1被使用了两次。千载难逢,我可以让它送我 1 - 4(顺序不重要;唯一计数很重要)。
我们 运行 集群上的版本 2.1.0。在我的本地笔记本电脑上,我安装了 OpenMPI 2.1.0 和 3.1.0(当前),我可以重现 C++ 程序的正确行为和 Java 程序在任一版本上的错误行为。
这是我创建的计数器 class:
import java.nio.ByteBuffer;
import java.util.ArrayList;
import mpi.MPI;
import mpi.MPIException;
import mpi.Win;
public class Counter {
Win win;
int hostRank;
int myVal;
ByteBuffer data;
int rank;
int size;
public Counter(int hostRank) throws MPIException {
this.setHostRank(hostRank);
this.setSize(MPI.COMM_WORLD.getSize());
this.setRank(MPI.COMM_WORLD.getRank());
if (this.getRank() == hostRank) {
// this.setData(MPI.newByteBuffer(this.getSize() * Integer.BYTES));
this.setData(ByteBuffer.allocateDirect(this.getSize() * Integer.BYTES));
for (int i = 0; i < this.getData().capacity(); i += Integer.BYTES)
this.getData().putInt(i, 0);
} else {
// this.setData(MPI.newByteBuffer(0));
this.setData(ByteBuffer.allocateDirect(0));
}
this.setWin(new Win(this.getData(), this.getData().capacity(), Integer.BYTES,
MPI.INFO_NULL, MPI.COMM_WORLD));
this.setMyVal(0);
}
public int increment(int increment) throws MPIException {
// A list to store all of the values we pull
ArrayList<Integer> vals = new ArrayList<Integer>();
for (int i = 0; i < this.getSize(); i++)
vals.add(i, 0);
// Need to convert the increment to a buffer
ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
incrbuff.putInt(increment);
// Our values are returned to us in a byte buffer
ByteBuffer valBuff = ByteBuffer.allocateDirect(Integer.BYTES);
// System.out.printf("Data for RANK %d: ", this.getRank());
this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
for (int i = 0; i < this.getSize(); i++) {
// Always ensure that we're at the top of the buffer
valBuff.position(0);
if (i == this.getRank()) {
this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
// Without this, it comes back all 1s
this.getWin().flushLocalAll();
// System.out.printf(" [%d] ", this.getMyVal() + increment);
} else {
this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
vals.set(i, valBuff.getInt(0));
// System.out.printf(" %d ", vals.get(i))
}
}
this.getWin().unlock(this.getHostRank());
this.setMyVal(this.getMyVal() + increment);
vals.set(this.getRank(), this.getMyVal());
// System.out.printf(" <<%d>> \n", vals.stream().mapToInt(Integer::intValue).sum());
// this.getWin().unlock(this.getHostRank());
return vals.stream().mapToInt(Integer::intValue).sum();
}
public void printCounter() {
if (this.getRank() == this.getHostRank()) {
for (int i = 0; i < this.getSize(); i++) {
System.out.printf(" %d ", this.getData().getInt());
}
System.out.println("");
}
}
public void delete() throws MPIException {
this.getWin().detach(this.getData());
this.getWin().free();
this.setData(null);
this.setHostRank(0);
this.setMyVal(0);
this.setRank(0);
this.setSize(0);
this.setWin(null);
}
private Win getWin() {
return win;
}
private void setWin(Win win) {
this.win = win;
}
private int getHostRank() {
return hostRank;
}
private void setHostRank(int hostrank) {
this.hostRank = hostrank;
}
private int getMyVal() {
return myVal;
}
private void setMyVal(int myval) {
this.myVal = myval;
}
private ByteBuffer getData() {
return data;
}
private void setData(ByteBuffer data) {
this.data = data;
}
private int getRank() {
return rank;
}
private void setRank(int rank) {
this.rank = rank;
}
private int getSize() {
return size;
}
private void setSize(int size) {
this.size = size;
}
}
还应注意 Java 代码包含 C++ 代码不包含的内容:
this.getWin().flushLocalAll();
没有这个,每个等级的计数器都会是“1”。
这里也是测试的第一部分class:
import java.util.Random;
import mpi.*;
public class CounterTest {
public static void main(String[] args) {
try {
MPI.Init(args);
} catch (MPIException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
test1();
// test2();
} catch (MPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
MPI.Finalize();
} catch (MPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void test1 () throws MPIException {
Counter c = new Counter(0);
int rank = MPI.COMM_WORLD.getRank();
int size = MPI.COMM_WORLD.getSize();
int result = c.increment(1);
System.out.printf("%d got counter %d\n", rank, result);
MPI.COMM_WORLD.barrier();
c.printCounter();
c.delete();
c = null;
}
}
我尝试过各种其他技术,例如尝试围栏、使用组以使用 MPI_Win_start() 和 MPI_Win_complete(),但无济于事。我觉得这与我所能获得的原始 C++ 代码的真实表现非常接近。
我错过了什么?为什么这与本机 C++ 代码的行为不同?
编辑: 我还发现我需要在运行针对实际集群(最近两天因维护而停机)时添加它:
this.getWin().flush(0);
我认为问题在于这些行
this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
vals.set(i, valBuff.getInt(0));
我的理解是你不能假设 valBuff
的内容是正确的 在 MPI_Win_unlock()
被调用之前。
我通过使用多个缓冲区重写了子例程,并在 MPI_Win_unlock()
之后设置 vals
并且能够获得正确的输出。
public int increment(int increment) throws MPIException {
// A list to store all of the values we pull
ArrayList<Integer> vals = new ArrayList<Integer>();
for (int i = 0; i < this.getSize(); i++)
vals.add(i, 0);
// Need to convert the increment to a buffer
ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
incrbuff.putInt(increment);
// Our values are returned to us in several byte buffers
ByteBuffer valBuff[] = new ByteBuffer[this.getSize()];
this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
for (int i = 0; i < this.getSize(); i++) {
// Always ensure that we're at the top of the buffer
if (i == this.getRank()) {
this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
} else {
valBuff[i] = ByteBuffer.allocateDirect(Integer.BYTES);
valBuff[i].position(0);
this.getWin().get(valBuff[i], 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
}
}
this.getWin().unlock(this.getHostRank());
for (int i = 0; i < this.getSize(); i++) {
if (i != this.getRank()) {
vals.set(i, valBuff[i].getInt(0));
}
}
this.setMyVal(this.getMyVal() + increment);
vals.set(this.getRank(), this.getMyVal());
return vals.stream().mapToInt(Integer::intValue).sum();
}
请注意,不再需要
this.getWin().flushLocalAll();
this.getWin().flush(0);
FWIW,我尝试使用 this.getSize()
整数的单个缓冲区,但无法正常工作。
我有一个案例需要在我们的 MPI 研究集群上使用 Java。 this question 中很好地介绍了我需要的一个特定功能(链接答案中包含 C++ 代码)。我构建了 C++ 代码,它完全按预期工作。
我试图构建与此代码等效的 Java,但失败得很惨。尽管在功能上我已经复制了 C++ 代码的功能,Java 版本并没有始终如一地 return 期望的结果。
mpiexec --oversubscribe -n 4 ./test
0 got counter 1
2 got counter 2
1 got counter 3
3 got counter 4
1 1 1 1
(运行 -- 在我的本地笔记本电脑上过度订阅。)
当我 运行 我的 Java 等效时,我没有得到接近相同结果的任何地方:
mpirun --oversubscribe -n 4 java -cp .:/usr/local/lib/openmpi/mpi.jar CounterTest
0 got counter 1
3 got counter 1
1 got counter 3
2 got counter 2
1 1 1 1
我希望每个等级都有一个且只有一个指示物。这个运行,计数器1被使用了两次。千载难逢,我可以让它送我 1 - 4(顺序不重要;唯一计数很重要)。
我们 运行 集群上的版本 2.1.0。在我的本地笔记本电脑上,我安装了 OpenMPI 2.1.0 和 3.1.0(当前),我可以重现 C++ 程序的正确行为和 Java 程序在任一版本上的错误行为。
这是我创建的计数器 class:
import java.nio.ByteBuffer;
import java.util.ArrayList;
import mpi.MPI;
import mpi.MPIException;
import mpi.Win;
public class Counter {
Win win;
int hostRank;
int myVal;
ByteBuffer data;
int rank;
int size;
public Counter(int hostRank) throws MPIException {
this.setHostRank(hostRank);
this.setSize(MPI.COMM_WORLD.getSize());
this.setRank(MPI.COMM_WORLD.getRank());
if (this.getRank() == hostRank) {
// this.setData(MPI.newByteBuffer(this.getSize() * Integer.BYTES));
this.setData(ByteBuffer.allocateDirect(this.getSize() * Integer.BYTES));
for (int i = 0; i < this.getData().capacity(); i += Integer.BYTES)
this.getData().putInt(i, 0);
} else {
// this.setData(MPI.newByteBuffer(0));
this.setData(ByteBuffer.allocateDirect(0));
}
this.setWin(new Win(this.getData(), this.getData().capacity(), Integer.BYTES,
MPI.INFO_NULL, MPI.COMM_WORLD));
this.setMyVal(0);
}
public int increment(int increment) throws MPIException {
// A list to store all of the values we pull
ArrayList<Integer> vals = new ArrayList<Integer>();
for (int i = 0; i < this.getSize(); i++)
vals.add(i, 0);
// Need to convert the increment to a buffer
ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
incrbuff.putInt(increment);
// Our values are returned to us in a byte buffer
ByteBuffer valBuff = ByteBuffer.allocateDirect(Integer.BYTES);
// System.out.printf("Data for RANK %d: ", this.getRank());
this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
for (int i = 0; i < this.getSize(); i++) {
// Always ensure that we're at the top of the buffer
valBuff.position(0);
if (i == this.getRank()) {
this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
// Without this, it comes back all 1s
this.getWin().flushLocalAll();
// System.out.printf(" [%d] ", this.getMyVal() + increment);
} else {
this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
vals.set(i, valBuff.getInt(0));
// System.out.printf(" %d ", vals.get(i))
}
}
this.getWin().unlock(this.getHostRank());
this.setMyVal(this.getMyVal() + increment);
vals.set(this.getRank(), this.getMyVal());
// System.out.printf(" <<%d>> \n", vals.stream().mapToInt(Integer::intValue).sum());
// this.getWin().unlock(this.getHostRank());
return vals.stream().mapToInt(Integer::intValue).sum();
}
public void printCounter() {
if (this.getRank() == this.getHostRank()) {
for (int i = 0; i < this.getSize(); i++) {
System.out.printf(" %d ", this.getData().getInt());
}
System.out.println("");
}
}
public void delete() throws MPIException {
this.getWin().detach(this.getData());
this.getWin().free();
this.setData(null);
this.setHostRank(0);
this.setMyVal(0);
this.setRank(0);
this.setSize(0);
this.setWin(null);
}
private Win getWin() {
return win;
}
private void setWin(Win win) {
this.win = win;
}
private int getHostRank() {
return hostRank;
}
private void setHostRank(int hostrank) {
this.hostRank = hostrank;
}
private int getMyVal() {
return myVal;
}
private void setMyVal(int myval) {
this.myVal = myval;
}
private ByteBuffer getData() {
return data;
}
private void setData(ByteBuffer data) {
this.data = data;
}
private int getRank() {
return rank;
}
private void setRank(int rank) {
this.rank = rank;
}
private int getSize() {
return size;
}
private void setSize(int size) {
this.size = size;
}
}
还应注意 Java 代码包含 C++ 代码不包含的内容:
this.getWin().flushLocalAll();
没有这个,每个等级的计数器都会是“1”。
这里也是测试的第一部分class:
import java.util.Random;
import mpi.*;
public class CounterTest {
public static void main(String[] args) {
try {
MPI.Init(args);
} catch (MPIException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
test1();
// test2();
} catch (MPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
MPI.Finalize();
} catch (MPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void test1 () throws MPIException {
Counter c = new Counter(0);
int rank = MPI.COMM_WORLD.getRank();
int size = MPI.COMM_WORLD.getSize();
int result = c.increment(1);
System.out.printf("%d got counter %d\n", rank, result);
MPI.COMM_WORLD.barrier();
c.printCounter();
c.delete();
c = null;
}
}
我尝试过各种其他技术,例如尝试围栏、使用组以使用 MPI_Win_start() 和 MPI_Win_complete(),但无济于事。我觉得这与我所能获得的原始 C++ 代码的真实表现非常接近。
我错过了什么?为什么这与本机 C++ 代码的行为不同?
编辑: 我还发现我需要在运行针对实际集群(最近两天因维护而停机)时添加它:
this.getWin().flush(0);
我认为问题在于这些行
this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
vals.set(i, valBuff.getInt(0));
我的理解是你不能假设 valBuff
的内容是正确的 在 MPI_Win_unlock()
被调用之前。
我通过使用多个缓冲区重写了子例程,并在 MPI_Win_unlock()
之后设置 vals
并且能够获得正确的输出。
public int increment(int increment) throws MPIException {
// A list to store all of the values we pull
ArrayList<Integer> vals = new ArrayList<Integer>();
for (int i = 0; i < this.getSize(); i++)
vals.add(i, 0);
// Need to convert the increment to a buffer
ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
incrbuff.putInt(increment);
// Our values are returned to us in several byte buffers
ByteBuffer valBuff[] = new ByteBuffer[this.getSize()];
this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
for (int i = 0; i < this.getSize(); i++) {
// Always ensure that we're at the top of the buffer
if (i == this.getRank()) {
this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
} else {
valBuff[i] = ByteBuffer.allocateDirect(Integer.BYTES);
valBuff[i].position(0);
this.getWin().get(valBuff[i], 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
}
}
this.getWin().unlock(this.getHostRank());
for (int i = 0; i < this.getSize(); i++) {
if (i != this.getRank()) {
vals.set(i, valBuff[i].getInt(0));
}
}
this.setMyVal(this.getMyVal() + increment);
vals.set(this.getRank(), this.getMyVal());
return vals.stream().mapToInt(Integer::intValue).sum();
}
请注意,不再需要
this.getWin().flushLocalAll();
this.getWin().flush(0);
FWIW,我尝试使用 this.getSize()
整数的单个缓冲区,但无法正常工作。