Java NIO UDP 多播 - 丢弃的数据包
Java NIO UDP Multicast - dropped packets
我们有一个使用 UDP 多播发送日志事件的日志系统。事件发生率约为 10,000 events/sec,平均事件大小约为 2Kb。
应用程序的 NIO 版本在每次测试期间都会丢失一小部分事件(大约 12M 中约有 2000 个事件)。有没有人在这方面有任何见解?
示例代码:
没有 NIO:
byte[] buf = new byte[65535];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
while (!Thread.currentThread().isInterrupted()) {
socket.receive(packet);
final byte[] tmpBuffer = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, tmpBuffer, 0,
tmpBuffer.length);
insertToNonBlockingQueue(tmpBuffer, packet.getSocketAddress());
}
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(socket);
}
使用蔚来:
ByteBuffer inBuffer = ByteBuffer.allocate(65535);
try {
while (!Thread.currentThread().isInterrupted()) {
SocketAddress addr = channel.receive(inBuffer);
inBuffer.flip();
final byte[] tmpBuffer = new byte[inBuffer.limit()];
inBuffer.get(tmpBuffer);
inBuffer.clear();
insertToNonBlockingQueue(tmpBuffer, addr);
}
} catch (ClosedByInterruptException ex) {
log.info("Channel closed by interrupt"); // normal shutdown
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(channel);
}
两个侦听器同时 运行,每次非 NIO 版本捕获所有日志事件,而 NIO 版本会遗漏一些。这不是网络问题,因为即使我们将代码切换到机器上的其他版本也是相同的行为。
您忘记了 compact()
或 clear()
get()
之后的缓冲区。此代码将在缓冲区填满后立即开始丢弃数据包。
DatagramPacket
案例应该在每次接收前重置数据包长度。
将实际的 DatagramPacket
插入队列并在每次接收时使用一个新的,或者在 NIO 的情况下合成一个新的会更简单。这样你就不需要新的数据结构了。
除了EJP所说的,你应该使用直接字节缓冲区作为读取缓冲区,否则套接字将在内部分配一个DBB,然后从中复制到你的BB中,然后你再从中复制到数组中. IE。有一个多余的复制操作。
此外,您可能希望将套接字的接收缓冲区配置为可以容纳多个数据包的大小。
我们有一个使用 UDP 多播发送日志事件的日志系统。事件发生率约为 10,000 events/sec,平均事件大小约为 2Kb。
应用程序的 NIO 版本在每次测试期间都会丢失一小部分事件(大约 12M 中约有 2000 个事件)。有没有人在这方面有任何见解?
示例代码: 没有 NIO:
byte[] buf = new byte[65535];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
while (!Thread.currentThread().isInterrupted()) {
socket.receive(packet);
final byte[] tmpBuffer = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, tmpBuffer, 0,
tmpBuffer.length);
insertToNonBlockingQueue(tmpBuffer, packet.getSocketAddress());
}
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(socket);
}
使用蔚来:
ByteBuffer inBuffer = ByteBuffer.allocate(65535);
try {
while (!Thread.currentThread().isInterrupted()) {
SocketAddress addr = channel.receive(inBuffer);
inBuffer.flip();
final byte[] tmpBuffer = new byte[inBuffer.limit()];
inBuffer.get(tmpBuffer);
inBuffer.clear();
insertToNonBlockingQueue(tmpBuffer, addr);
}
} catch (ClosedByInterruptException ex) {
log.info("Channel closed by interrupt"); // normal shutdown
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(channel);
}
两个侦听器同时 运行,每次非 NIO 版本捕获所有日志事件,而 NIO 版本会遗漏一些。这不是网络问题,因为即使我们将代码切换到机器上的其他版本也是相同的行为。
您忘记了 compact()
或 clear()
get()
之后的缓冲区。此代码将在缓冲区填满后立即开始丢弃数据包。
DatagramPacket
案例应该在每次接收前重置数据包长度。
将实际的 DatagramPacket
插入队列并在每次接收时使用一个新的,或者在 NIO 的情况下合成一个新的会更简单。这样你就不需要新的数据结构了。
除了EJP所说的,你应该使用直接字节缓冲区作为读取缓冲区,否则套接字将在内部分配一个DBB,然后从中复制到你的BB中,然后你再从中复制到数组中. IE。有一个多余的复制操作。
此外,您可能希望将套接字的接收缓冲区配置为可以容纳多个数据包的大小。