Java 获取 MulticastSocket.receive 以抛出 ClosedByInterruptException

Java Get MulticastSocket.receive to Throw ClosedByInterruptException

我有一些代码可以从多播套接字读取数据,直到用户定义的结束时间。如果线程通过调用 Thread.interrupt (或您可以想到的任何其他用户启动的操作)中断,我也想停止读取数据。我不知道如何在线程中断时获得通知。现有代码如下:

// These are the constants I am given
final int         mcastPort = ...;
final InetAddress mcastIP   = ...;
final InetAddress ifaceIP   = ...; // Null indicates all interfaces should be used
final Instant     endTime   = ...; // Time at which to stop processing

// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
    socket.joinGroup(mcastIP);
    if (ifaceIP != null)
        socket.setInterface(ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        socket.setSoTimeout(soTimeout);
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
    // Uh-oh... this never happens
} ...

我看到有一个DatagramSocket.getChannel方法returns一个DatagramChannel,所以我很自然地假设该类型用于read/write底层套接字。这个假设是不正确的,这意味着 MulticastSocket 没有实现 InterruptibleChannel。因此,MulticastSocket.receive 永远不会抛出 ClosedByInterruptException.

我已经在网上搜索示例,但无法弄清楚如何修改上面的代码以使用 DatagramChannel 而不是 MulticastSocket。我需要帮助的问题是:

  1. 如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?
  2. 如何将 InetAddress 转换为 NetworkInterface 对象?

以下是我对如何将我的实现从 MulticastSocket 转换为 DatagramChannel 以满足我的要求的最佳猜测:

// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);

try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.connect(new InetSocketAddress(port));
    mcastChannel.join(mcastIP);
    if (ifaceIP != null)
        // HELP: this option requires an InterfaceAddress object,
        //       but I only have access to an InetAddress object
        mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        // HELP: SO_TIMEOUT is not a member of StandardSocketOptions
        mcastChannel.setOption(SO_TIMEOUT, ???);
        mcastChannel.receive(buffer);

        // Process packet
        ...
    } while (true);
} ...

这种方法行得通吗? DatagramChannel.receive 没有将 SocketTimeoutException 列为它能够抛出的异常之一。如果这可行,那么请告诉我如何将第二个实现更改为与第一个等效,但能够在客户端调用 Thread.interrupt 时抛出 ClosedByInterruptException。如果没有,那么对于我如何满足在预定义时间停止数据报接收的要求,同时还提供一种通过用户交互停止执行的方法,是否有人有任何其他想法?

How do I set the SO_TIMEOUT parameter on a DatagramChannel?

通过调用 channel.socket().setSoTimeout().

How do I convert an InetAddress into a NetworkInterface object?

您枚举网络接口,直到找到具有所需地址的接口。

DatagramChannel.receive() doesn't list SocketTimeoutException

不必。它列出了 IOException,并且 SocketTimeoutException 扩展了 IOException.

你的第二段代码应该调用 bind(),而不是 connect(),并且它应该在调用 join() 之前设置接口,而不是之后。除此之外,一旦您修复了网络接口问题,它应该会按预期工作,如果中断,它会抛出 ClosedByInterruptException

最终的工作代码是:

// Create a datagram packet used to read multicast data from the socket
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final DatagramChannel mcastChannel =
        DatagramChannel.open(StandardProtocolFamily.INET)) {

    // Set the appropriate parameters on the socket
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.bind(new InetSocketAddress(port));
    if (ifaceIP == null) {
        // Call join on each NetworkInterface that supports IPv4 multicast
    } else {
        mcastChannel.join(mcastIP, NetworkInterface.getByInetAddress(ifaceIP));
    }

    final DatagramSocket socket = mcastChannel.socket();

    do {
        final Duration timeToEnd = Duration.between(Instant.now(), endTime);
        if (timeToEnd.compareTo(Duration.ZERO) < 0) break;

        socket.setSoTimeout((int)timeToEnd.toMillis());
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // The end time has passed
} catch (final ClosedByInterruptException e) {
    // The user initiated the closure
} ...

注意事项:

  1. socket.receive的调用。如果将其更改为 mcastChannel.receive,您将永远不会得到 SocketTimeoutException.
  2. bind 的调用发生在对 join 的调用之前。您可以在任意多个不同的网络接口上任意多次调用 join