零拷贝接收和消息大小
Zero-copy receive and message size
在使用零拷贝接收时,有没有办法查询接收到的消息的大小?
有了这个我(尝试)实现零拷贝:
zmq_recv(sock, buf, sizeof(buf), 0);
我也试过:
zmq_msg_t msg;
zmq_msg_init_data (&msg, buf, sizeof(buf), nullptr, NULL);
zmq_msg_recv(&msg, sock, 0);
size_t len = zmq_msg_size(&msg);
这 returns 正确大小,但不填充 buf
。我认为 zmq_msg_init_data
不适用于 zmq_msg_recv
并且消息会在接收时重建。
在零拷贝上引用 guide:
There is no way to do zero-copy on receive: ZeroMQ delivers you a buffer that you can store as long as you wish, but it will not write data directly into application buffers.
零拷贝只用于发送,不用于接收。
哦还有zmq_recv
returns收到的字节数。
如果您使用 JeroMQ(Pure Java for ZeroMQ),您可以通过 ZMQ.Socket.setMsgAllocator
在接收端实现零拷贝
一般来说,我们只关心大对象的零拷贝,所以你可以设置自定义消息分配器
例如:
class MappedMemoryMsgAllocator implements MsgAllocator {
static final MsgAllocatorHeap heap = new MsgAllocatorHeap();
private final int threshold;
public MappedMemoryMsgAllocator(int threshold) {
this.threshold = threshold;
}
public MappedMemoryMsgAllocator() {
this(1024 * 1024 * 4);
}
@Override
public Msg allocate(int size) {
if ((threshold > 0) && (size > threshold)) {
try {
return new Msg(UtilsJNI.nativeMapMemory(size, UtilsJNI.PROT_WRITE, UtilsJNI.MAP_PRIVATE));
} catch (IOException e) {
Log.d(TAG, "allocate:: fail to mmap", e);
return null;
}
} else {
return heap.allocate(size);
}
}
public static void forceFreeMsgArray(zmq.Msg[] msgs) {
if (msgs != null) {
for (zmq.Msg msg : msgs) {
final ByteBuffer buf = msg.buf();
if (buf.isDirect()) {
try {
UtilsJNI.nativeUnmapMemory(buf);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
本机 C++ 代码:
extern "C"
JNIEXPORT jobject
UtilsJNI_nativeMapMemory(JNIEnv *env, jobject clazz, jlong size,
jint protection, jint mapFlags) {
void *pAddr = mmap64(NULL, size, protection, mapFlags|MAP_ANONYMOUS, -1, 0);
if (pAddr == NULL) {
env->ThrowNew(env->FindClass("java/io/IOException"), "fail to mmap");
return NULL;
} else {
return env->NewDirectByteBuffer(pAddr, size);
}
}
extern "C"
JNIEXPORT void
UtilsJNI_nativeUnmapMemory(JNIEnv *env, jobject clazz, jobject buffer) {
if (munmap(env->GetDirectBufferAddress(buffer), env->GetDirectBufferCapacity(buffer)) < 0) {
env->ThrowNew(env->FindClass("java/io/IOException"), "fail to munmap");
}
}
在使用零拷贝接收时,有没有办法查询接收到的消息的大小?
有了这个我(尝试)实现零拷贝:
zmq_recv(sock, buf, sizeof(buf), 0);
我也试过:
zmq_msg_t msg;
zmq_msg_init_data (&msg, buf, sizeof(buf), nullptr, NULL);
zmq_msg_recv(&msg, sock, 0);
size_t len = zmq_msg_size(&msg);
这 returns 正确大小,但不填充 buf
。我认为 zmq_msg_init_data
不适用于 zmq_msg_recv
并且消息会在接收时重建。
在零拷贝上引用 guide:
There is no way to do zero-copy on receive: ZeroMQ delivers you a buffer that you can store as long as you wish, but it will not write data directly into application buffers.
零拷贝只用于发送,不用于接收。
哦还有zmq_recv
returns收到的字节数。
如果您使用 JeroMQ(Pure Java for ZeroMQ),您可以通过 ZMQ.Socket.setMsgAllocator
在接收端实现零拷贝一般来说,我们只关心大对象的零拷贝,所以你可以设置自定义消息分配器 例如:
class MappedMemoryMsgAllocator implements MsgAllocator {
static final MsgAllocatorHeap heap = new MsgAllocatorHeap();
private final int threshold;
public MappedMemoryMsgAllocator(int threshold) {
this.threshold = threshold;
}
public MappedMemoryMsgAllocator() {
this(1024 * 1024 * 4);
}
@Override
public Msg allocate(int size) {
if ((threshold > 0) && (size > threshold)) {
try {
return new Msg(UtilsJNI.nativeMapMemory(size, UtilsJNI.PROT_WRITE, UtilsJNI.MAP_PRIVATE));
} catch (IOException e) {
Log.d(TAG, "allocate:: fail to mmap", e);
return null;
}
} else {
return heap.allocate(size);
}
}
public static void forceFreeMsgArray(zmq.Msg[] msgs) {
if (msgs != null) {
for (zmq.Msg msg : msgs) {
final ByteBuffer buf = msg.buf();
if (buf.isDirect()) {
try {
UtilsJNI.nativeUnmapMemory(buf);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
本机 C++ 代码:
extern "C"
JNIEXPORT jobject
UtilsJNI_nativeMapMemory(JNIEnv *env, jobject clazz, jlong size,
jint protection, jint mapFlags) {
void *pAddr = mmap64(NULL, size, protection, mapFlags|MAP_ANONYMOUS, -1, 0);
if (pAddr == NULL) {
env->ThrowNew(env->FindClass("java/io/IOException"), "fail to mmap");
return NULL;
} else {
return env->NewDirectByteBuffer(pAddr, size);
}
}
extern "C"
JNIEXPORT void
UtilsJNI_nativeUnmapMemory(JNIEnv *env, jobject clazz, jobject buffer) {
if (munmap(env->GetDirectBufferAddress(buffer), env->GetDirectBufferCapacity(buffer)) < 0) {
env->ThrowNew(env->FindClass("java/io/IOException"), "fail to munmap");
}
}