MPI.NET Send/ImmediateProbe 使用多个主机时不工作

MPI.NET Send/ImmediateProbe not working when using multiple hosts

我开发了一个 MPI 测试程序,其中主节点将工作分配给工作节点。 工作节点使用 comm.Send() 请求工作,主节点使用 comm.ImmediateProbe 检查是否有任何工作节点想要请求一些工作。如果请求可用,则使用 comm.Receive 读取并将工作发送给工作人员进行处理。

当我 运行 我的测试程序在单个主机上使用 mpiexec.exe 时,无论是本地主机还是远程主机,一切都按预期工作,但是当我 运行 它在两个主机上时同时 远程主机上的 Send 块和主节点 ImmediateProbe 永远不会收到远程主机上的工作人员发送的消息。

我运行程序mpiexec.exe -wdir \DESKTOP-58QONBS\MPITest -hosts 2 DESKTOP-58QONBS 2 LAPTOP-L8F7AN5R 1 MPITest.exe

我是 MPI 的新手,所以也许我做错了什么我只是想不通为什么同时使用两个主机时会出现这样的行为。

完整代码如下:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace MPITest
{
    public abstract class MPIMasterWorkerBase<TWork, TResult>
        where TWork : class
        where TResult : class
    {
        protected abstract void Initialize(bool isMaster);
        protected abstract void Main();
        protected abstract void ProcessResult(TResult result);
        protected abstract TResult ProcessWork(TWork work);
        protected abstract TWork GetWork();

        private volatile bool terminate = false;
        private Thread thread;
        private MPI.Intracommunicator comm;

        public void Run(string[] args)
        {
            MPI.Environment.Run(ref args, comm =>
            {
                this.comm = comm;

                if (comm.Size < 2)
                {
                    Console.WriteLine("At least 2 processes are required.");
                    return;
                }

                if (comm.Rank == 0)
                {
                    Initialize(isMaster: true);

                    thread = new Thread(MasterThread);
                    thread.Start();

                    Main();

                    terminate = true;
                    thread.Join();
                }
                else
                {
                    Initialize(isMaster: false);

                    thread = new Thread(WorkerThread);
                    thread.Start();
                    thread.Join();
                }
            });
        }

        private void MasterThread()
        {
            Console.WriteLine($"MasterStart {MPI.Environment.ProcessorName}");

            var done = new bool[comm.Size];
            done[0] = true;

            while (!done.All(x => x == true))
            {
                for (int i = 1; i < comm.Size; i++)
                {
                    if (comm.ImmediateProbe(i, 0) != null)
                    {
                        Console.WriteLine($"Receive: {i}");
                        comm.Receive<int>(i, 0);

                        var work = GetWork();
                        if (work != null)
                        {
                            comm.Send(1, i, 0);
                            comm.Send(work, i, 0);
                        }
                        else
                        {
                            if (terminate)
                            {
                                comm.Send(-1, i, 0);
                                done[i] = true;
                            }
                            else
                            {
                                comm.Send(0, i, 0);
                            }
                        }
                    }

                    if (comm.ImmediateProbe(i, 1) != null)
                    {
                        var result = comm.Receive<TResult>(i, 1);
                        ProcessResult(result);
                    }
                }

                Thread.Sleep(1000);
            }

            Console.WriteLine("MasterStop");
        }

        private void WorkerThread()
        {
            Console.WriteLine($"WorkerStart: {comm.Rank} {MPI.Environment.ProcessorName}");

            while (!terminate)
            {
                Thread.Sleep(1000);
                Console.WriteLine($"Send: {comm.Rank}");
                comm.Send(0, 0, 0);
                var flag = comm.Receive<int>(0, 0);
                if (flag == -1)
                    break;
                else if (flag == 0)
                    continue;

                var work = comm.Receive<TWork>(0, 0);
                var result = ProcessWork(work);
                comm.Send(result, 0, 1);
            }

            Console.WriteLine($"WorkerStop: {comm.Rank}");
        }
    }

    [Serializable]
    public class WorkItem
    {
        public int Id { get; set; }
    }

    public class MPITest : MPIMasterWorkerBase<WorkItem, WorkItem>
    {
        private ConcurrentQueue<WorkItem> queue = new();
        private int id;

        protected override void Initialize(bool isMaster)
        {

        }

        protected override void Main()
        {
            var startTime = DateTime.UtcNow;
            while ((DateTime.UtcNow - startTime).TotalSeconds < 10)
            {
                for (int i = 0; i < 2; i++)
                    queue.Enqueue(new WorkItem { Id = id++ });
                Thread.Sleep(1000);
            }
        }

        protected override WorkItem GetWork()
        {
            if (queue.TryDequeue(out var result))
                return result;
            return null;
        }

        protected override WorkItem ProcessWork(WorkItem work)
        {
            Console.WriteLine($"Processing Work {work.Id}");
            return work;
        }

        protected override void ProcessResult(WorkItem result)
        {
            Console.WriteLine($"Process Result {result.Id}");
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            new MPITest().Run(args);
        }
    }
}

comm.Send 正在阻塞,但在等待几分钟后程序开始运行。

这些问题是由 VirtualBox Host-Only Network 适配器引起的,该适配器也安装在系统上。在网络设置中禁用此适配器解决了所有问题。