发送特定大小的数据时出现 BufferOverflowException
BufferOverflowException while sending data of specific size
我有一堆键和值,我想通过将它们打包到一个字节数组中来发送到我们的消息 queue。我将把所有的键和值组成一个字节数组,它们应该总是小于 50K,然后发送到我们的消息 queue。我有 header,然后是数据。
数据包class:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.getDatacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
// below line throws "BufferOverflowException"
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
在上面的代码中,我在 sendData
方法中的 buffer.put(itemBuffer);
处得到了 java.nio.BufferOverflowException
。我无法理解为什么会出现此异常以及如何解决它。
我是这样调用这段代码的:
Packet packet = new Packet(partition);
packet.addAndSendJunked("hello".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
packet.close();
很明显itemBuffer
加上已经放入buffer
的headers的长度超过了buffer.capacity()
,即MAX_SIZE.
因此要么 addHeader()
中存在错误,要么造成 itemBuffer
,或者 MAX_SIZE 太小。
注意
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
此处您正在排队 MAX_SIZE 字节的整个 buffer
,即使您可能没有将 MAX_SIZE 字节放入其中。最好将 buffer
本身传递给此方法并避免代码中的不断换行。
我有一堆键和值,我想通过将它们打包到一个字节数组中来发送到我们的消息 queue。我将把所有的键和值组成一个字节数组,它们应该总是小于 50K,然后发送到我们的消息 queue。我有 header,然后是数据。
数据包class:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.getDatacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
// below line throws "BufferOverflowException"
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
在上面的代码中,我在 sendData
方法中的 buffer.put(itemBuffer);
处得到了 java.nio.BufferOverflowException
。我无法理解为什么会出现此异常以及如何解决它。
我是这样调用这段代码的:
Packet packet = new Packet(partition);
packet.addAndSendJunked("hello".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
packet.close();
很明显itemBuffer
加上已经放入buffer
的headers的长度超过了buffer.capacity()
,即MAX_SIZE.
因此要么 addHeader()
中存在错误,要么造成 itemBuffer
,或者 MAX_SIZE 太小。
注意
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
此处您正在排队 MAX_SIZE 字节的整个 buffer
,即使您可能没有将 MAX_SIZE 字节放入其中。最好将 buffer
本身传递给此方法并避免代码中的不断换行。