Java 获取 MulticastSocket.receive 以抛出 ClosedByInterruptException
Java Get MulticastSocket.receive to Throw ClosedByInterruptException
我有一些代码可以从多播套接字读取数据,直到用户定义的结束时间。如果线程通过调用 Thread.interrupt
(或您可以想到的任何其他用户启动的操作)中断,我也想停止读取数据。我不知道如何在线程中断时获得通知。现有代码如下:
// These are the constants I am given
final int mcastPort = ...;
final InetAddress mcastIP = ...;
final InetAddress ifaceIP = ...; // Null indicates all interfaces should be used
final Instant endTime = ...; // Time at which to stop processing
// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
socket.joinGroup(mcastIP);
if (ifaceIP != null)
socket.setInterface(ifaceIP);
do {
final Duration soTimeout = Duration.between(Instant.now(), endTime);
socket.setSoTimeout(soTimeout);
socket.receive(packet);
// Process packet
...
} while (true);
} catch (final SocketTimeoutException e) {
// Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
// Uh-oh... this never happens
} ...
我看到有一个DatagramSocket.getChannel
方法returns一个DatagramChannel
,所以我很自然地假设该类型用于read/write底层套接字。这个假设是不正确的,这意味着 MulticastSocket
没有实现 InterruptibleChannel
。因此,MulticastSocket.receive
永远不会抛出 ClosedByInterruptException
.
我已经在网上搜索示例,但无法弄清楚如何修改上面的代码以使用 DatagramChannel
而不是 MulticastSocket
。我需要帮助的问题是:
- 如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?
- 如何将
InetAddress
转换为 NetworkInterface
对象?
以下是我对如何将我的实现从 MulticastSocket
转换为 DatagramChannel
以满足我的要求的最佳猜测:
// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);
try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
mcastChannel.connect(new InetSocketAddress(port));
mcastChannel.join(mcastIP);
if (ifaceIP != null)
// HELP: this option requires an InterfaceAddress object,
// but I only have access to an InetAddress object
mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);
do {
final Duration soTimeout = Duration.between(Instant.now(), endTime);
// HELP: SO_TIMEOUT is not a member of StandardSocketOptions
mcastChannel.setOption(SO_TIMEOUT, ???);
mcastChannel.receive(buffer);
// Process packet
...
} while (true);
} ...
这种方法行得通吗? DatagramChannel.receive
没有将 SocketTimeoutException
列为它能够抛出的异常之一。如果这可行,那么请告诉我如何将第二个实现更改为与第一个等效,但能够在客户端调用 Thread.interrupt
时抛出 ClosedByInterruptException
。如果没有,那么对于我如何满足在预定义时间停止数据报接收的要求,同时还提供一种通过用户交互停止执行的方法,是否有人有任何其他想法?
How do I set the SO_TIMEOUT parameter on a DatagramChannel?
通过调用 channel.socket().setSoTimeout()
.
How do I convert an InetAddress
into a NetworkInterface
object?
您枚举网络接口,直到找到具有所需地址的接口。
DatagramChannel.receive()
doesn't list SocketTimeoutException
不必。它列出了 IOException
,并且 SocketTimeoutException
扩展了 IOException
.
你的第二段代码应该调用 bind()
,而不是 connect()
,并且它应该在调用 join()
之前设置接口,而不是之后。除此之外,一旦您修复了网络接口问题,它应该会按预期工作,如果中断,它会抛出 ClosedByInterruptException
。
最终的工作代码是:
// Create a datagram packet used to read multicast data from the socket
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// Process incoming datagram packets
try (final DatagramChannel mcastChannel =
DatagramChannel.open(StandardProtocolFamily.INET)) {
// Set the appropriate parameters on the socket
mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
mcastChannel.bind(new InetSocketAddress(port));
if (ifaceIP == null) {
// Call join on each NetworkInterface that supports IPv4 multicast
} else {
mcastChannel.join(mcastIP, NetworkInterface.getByInetAddress(ifaceIP));
}
final DatagramSocket socket = mcastChannel.socket();
do {
final Duration timeToEnd = Duration.between(Instant.now(), endTime);
if (timeToEnd.compareTo(Duration.ZERO) < 0) break;
socket.setSoTimeout((int)timeToEnd.toMillis());
socket.receive(packet);
// Process packet
...
} while (true);
} catch (final SocketTimeoutException e) {
// The end time has passed
} catch (final ClosedByInterruptException e) {
// The user initiated the closure
} ...
注意事项:
- 对
socket.receive
的调用。如果将其更改为 mcastChannel.receive
,您将永远不会得到 SocketTimeoutException
.
- 对
bind
的调用发生在对 join
的调用之前。您可以在任意多个不同的网络接口上任意多次调用 join
。
我有一些代码可以从多播套接字读取数据,直到用户定义的结束时间。如果线程通过调用 Thread.interrupt
(或您可以想到的任何其他用户启动的操作)中断,我也想停止读取数据。我不知道如何在线程中断时获得通知。现有代码如下:
// These are the constants I am given
final int mcastPort = ...;
final InetAddress mcastIP = ...;
final InetAddress ifaceIP = ...; // Null indicates all interfaces should be used
final Instant endTime = ...; // Time at which to stop processing
// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
socket.joinGroup(mcastIP);
if (ifaceIP != null)
socket.setInterface(ifaceIP);
do {
final Duration soTimeout = Duration.between(Instant.now(), endTime);
socket.setSoTimeout(soTimeout);
socket.receive(packet);
// Process packet
...
} while (true);
} catch (final SocketTimeoutException e) {
// Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
// Uh-oh... this never happens
} ...
我看到有一个DatagramSocket.getChannel
方法returns一个DatagramChannel
,所以我很自然地假设该类型用于read/write底层套接字。这个假设是不正确的,这意味着 MulticastSocket
没有实现 InterruptibleChannel
。因此,MulticastSocket.receive
永远不会抛出 ClosedByInterruptException
.
我已经在网上搜索示例,但无法弄清楚如何修改上面的代码以使用 DatagramChannel
而不是 MulticastSocket
。我需要帮助的问题是:
- 如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?
- 如何将
InetAddress
转换为NetworkInterface
对象?
以下是我对如何将我的实现从 MulticastSocket
转换为 DatagramChannel
以满足我的要求的最佳猜测:
// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);
try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
mcastChannel.connect(new InetSocketAddress(port));
mcastChannel.join(mcastIP);
if (ifaceIP != null)
// HELP: this option requires an InterfaceAddress object,
// but I only have access to an InetAddress object
mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);
do {
final Duration soTimeout = Duration.between(Instant.now(), endTime);
// HELP: SO_TIMEOUT is not a member of StandardSocketOptions
mcastChannel.setOption(SO_TIMEOUT, ???);
mcastChannel.receive(buffer);
// Process packet
...
} while (true);
} ...
这种方法行得通吗? DatagramChannel.receive
没有将 SocketTimeoutException
列为它能够抛出的异常之一。如果这可行,那么请告诉我如何将第二个实现更改为与第一个等效,但能够在客户端调用 Thread.interrupt
时抛出 ClosedByInterruptException
。如果没有,那么对于我如何满足在预定义时间停止数据报接收的要求,同时还提供一种通过用户交互停止执行的方法,是否有人有任何其他想法?
How do I set the SO_TIMEOUT parameter on a DatagramChannel?
通过调用 channel.socket().setSoTimeout()
.
How do I convert an
InetAddress
into aNetworkInterface
object?
您枚举网络接口,直到找到具有所需地址的接口。
DatagramChannel.receive()
doesn't listSocketTimeoutException
不必。它列出了 IOException
,并且 SocketTimeoutException
扩展了 IOException
.
你的第二段代码应该调用 bind()
,而不是 connect()
,并且它应该在调用 join()
之前设置接口,而不是之后。除此之外,一旦您修复了网络接口问题,它应该会按预期工作,如果中断,它会抛出 ClosedByInterruptException
。
最终的工作代码是:
// Create a datagram packet used to read multicast data from the socket
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// Process incoming datagram packets
try (final DatagramChannel mcastChannel =
DatagramChannel.open(StandardProtocolFamily.INET)) {
// Set the appropriate parameters on the socket
mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
mcastChannel.bind(new InetSocketAddress(port));
if (ifaceIP == null) {
// Call join on each NetworkInterface that supports IPv4 multicast
} else {
mcastChannel.join(mcastIP, NetworkInterface.getByInetAddress(ifaceIP));
}
final DatagramSocket socket = mcastChannel.socket();
do {
final Duration timeToEnd = Duration.between(Instant.now(), endTime);
if (timeToEnd.compareTo(Duration.ZERO) < 0) break;
socket.setSoTimeout((int)timeToEnd.toMillis());
socket.receive(packet);
// Process packet
...
} while (true);
} catch (final SocketTimeoutException e) {
// The end time has passed
} catch (final ClosedByInterruptException e) {
// The user initiated the closure
} ...
注意事项:
- 对
socket.receive
的调用。如果将其更改为mcastChannel.receive
,您将永远不会得到SocketTimeoutException
. - 对
bind
的调用发生在对join
的调用之前。您可以在任意多个不同的网络接口上任意多次调用join
。