尽管绑定/连接地址匹配,但为什么我会间歇性地收到 PortUnreachableException?

Why am I intermittently receiving PortUnreachableException despite bind / connect addresses matching up?

我有一个小的概念验证,我试图通过不同的端口路由 UDP 流量,而任何一方都不知道端口转换。如果我禁用 connect() 调用并只执行 'fire-and-forget' 数据报,我就能够使它正常工作。此外,我的代码即使在调用 connect() 时也能正常工作,但它似乎会选择特定的流量并抛出 PortUnreachableException。

实际 server/client(我路由的流量)可能会双向发送流量。这些流量中的一些可以正常通过,但有些则不能,并导致无法访问的端口错误。我已经能够重现这个有问题的 server/client 行为,并且我已经使用一个名为 run_buggySend() 的简单方法将该实现烘焙到我下面的 PoC 代码中,您将在下面看到。

我倒着看了看代码,看不出为什么会抛出异常。我知道大多数人使用 bind() 或 connect() 但不是两者都使用,但我的用例需要两者,因为我正在路由的某些协议需要非常特定的端口用于 both 方向的流量。此外,虽然我可以去掉 connect() 调用以使其工作,但我喜欢缓存路由信息以提高性能的想法,并且不明白为什么它不兼容。然后,如果没有 bind(),从另一个方向到达硬编码端口的流量永远不会到达我的 DatagramChannel。

我也想知道这是否会遇到一些错误的 Java 代码,因为端口不可达异常甚至没有从更相关/正确的 DatagramChannel 中抛出。如果我将触发数据报发送到 3000,则 13001 通道会抛出错误。同样,如果我将它发送到 13001,3000 通道会抛出错误。

也就是说,我将通过下面的最小复制代码。感谢您提供任何提示或帮助!

import java.io.IOException;
import java.math.BigInteger;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ByteChannel;
import java.nio.charset.StandardCharsets;

public class ByteChannelUDPBridge implements Runnable
{
    private final ByteBuffer buff = ByteBuffer.allocateDirect(4096);

    private final ByteChannelUDP channelA;
    private final ByteChannelUDP channelB;

    private final int portBase = 3000;
    private final int portOffset = 10000;

    public ByteChannelUDPBridge() throws IOException
    {
        InetAddress loopback = InetAddress.getLoopbackAddress();

        channelA = new ByteChannelUDP(
            new InetSocketAddress(loopback, portBase),                      //    bind=3000
            new InetSocketAddress(loopback, portBase + 1));                 // connect=3001
        channelB = new ByteChannelUDP(
            new InetSocketAddress(loopback, portOffset + portBase + 1),     //    bind=13001
            new InetSocketAddress(loopback, portOffset + portBase));        // connect=13000
    }

    public void transfer(ByteChannel chanIn, ByteChannel chanOut) throws IOException
    {
        try
        {
            chanIn.read(buff);

            if (buff.position() > 0)
            {
                System.out.print(chanIn + " recieved " + buff.position() + " bytes: " + buff.position() + " bytes: ");

                buff.flip();
                logToHex(StandardCharsets.UTF_8.decode(buff));
                buff.rewind();
                chanOut.write(buff);
                buff.clear();
            }
        }
        catch (PortUnreachableException ex)
        {
            System.err.println(chanIn + ": " + ex);
        }
    }

    public void run()
    {
        try
        {
            while (true)
            {
                transfer(channelA, channelB); // 3000  >> B:13001 -> C:13000
                transfer(channelB, channelA); // 13001 >> B:3000  -> C:3001
                Thread.sleep(1);
            }
        }
        catch (IOException | InterruptedException ex) { ex.printStackTrace(); }
        finally
        {
            try
            {
                channelA.close();
                channelB.close();
            }
            catch (IOException dontCare) { dontCare.printStackTrace(); }
        }
    }

    public void run_buggySend()
    {
        try
        {
            DatagramSocket sock = new DatagramSocket(3001); // 13000
            InetSocketAddress recip = new InetSocketAddress(InetAddress.getLoopbackAddress(), 3000); // 13001
            byte[] buffer = new byte[]{1, 2, 3};

            while (true)
            {
                sock.send(new DatagramPacket(buffer, buffer.length, recip));
                Thread.sleep(15000);
            }
        }
        catch (IOException | InterruptedException ex)
        {
            ex.printStackTrace();
        }
    }

    public void logToHex(CharBuffer arg) { System.out.format("%x\n", new BigInteger(1, arg.toString().getBytes())); }

    public static void main(String[] args) throws Exception
    {
        ByteChannelUDPBridge listener = new ByteChannelUDPBridge();
        Thread t1 = new Thread(listener);
        t1.start();

        Thread t2 = new Thread(listener::run_buggySend);
        t2.start();
    }
}

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.DatagramChannel;

public class ByteChannelUDP implements ByteChannel
{
    private final DatagramChannel channel;

    private final SocketAddress bind, connect;

    public ByteChannelUDP(InetSocketAddress addrBind, InetSocketAddress addrConnect) throws IOException
    {
        bind = addrBind;
        connect = addrConnect;
        channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.bind(bind);
        channel.connect(connect);
    }

    @Override
    public int read(ByteBuffer dst) throws IOException
    {
        //return channel.read(dst); // this also causes the unreachable exception
        int before = dst.remaining();
        SocketAddress rec = channel.receive(dst);
        return dst.remaining() - before;
    }

    @Override
    public int write(ByteBuffer src) throws IOException
    {
        //return channel.write(src); // not used unless i can get read() to work
        return channel.send(src, connect);
    }

    @Override
    public boolean isOpen() { return channel.isOpen(); }

    @Override
    public void close() throws IOException { channel.close(); }

    @Override
    public String toString()
    {
        return bind.toString() + " <> " + connect.toString();
    }
}

偶然发现问题和解决方案。虽然 recieve() / read() 方法抛出 PortUnreachableException,实际上是 send() / write() 触发了错误。

其中一侧的接收器显然配置不正确且不够容易修复,因此可以理解,中间写入将是出现错误的地方。

然而,由于某些难以理解的原因,Java 决定在 read() 方法中提醒调用者注意无法传递的数据报更有意义,尽管事实上它是 write() 失败了。此外,似乎也没有记录这种行为。 Java 应该在下一次写入时或在单独的 'error polling' 方法中抛出异常,而不是从与写入操作失败几乎完全无关的读取操作中抛出异常。