零拷贝接收和消息大小

Zero-copy receive and message size

在使用零拷贝接收时,有没有办法查询接收到的消息的大小?

有了这个我(尝试)实现零拷贝:

zmq_recv(sock, buf, sizeof(buf), 0);

我也试过:

zmq_msg_t msg;
zmq_msg_init_data (&msg, buf, sizeof(buf), nullptr, NULL);
zmq_msg_recv(&msg, sock, 0);
size_t len = zmq_msg_size(&msg);

这 returns 正确大小,但不填充 buf。我认为 zmq_msg_init_data 不适用于 zmq_msg_recv 并且消息会在接收时重建。

在零拷贝上引用 guide

There is no way to do zero-copy on receive: ZeroMQ delivers you a buffer that you can store as long as you wish, but it will not write data directly into application buffers.

零拷贝只用于发送,不用于接收。

哦还有zmq_recvreturns收到的字节数。

如果您使用 JeroMQ(Pure Java for ZeroMQ),您可以通过 ZMQ.Socket.setMsgAllocator

在接收端实现零拷贝

一般来说,我们只关心大对象的零拷贝,所以你可以设置自定义消息分配器 例如:

class MappedMemoryMsgAllocator implements MsgAllocator {

static final MsgAllocatorHeap heap = new MsgAllocatorHeap();
private final int threshold;

public MappedMemoryMsgAllocator(int threshold) {
    this.threshold = threshold;
}

public MappedMemoryMsgAllocator() {
    this(1024 * 1024 * 4);
}

@Override
public Msg allocate(int size) {
    if ((threshold > 0) && (size > threshold)) {
        try {
            return new Msg(UtilsJNI.nativeMapMemory(size, UtilsJNI.PROT_WRITE, UtilsJNI.MAP_PRIVATE));
        } catch (IOException e) {
            Log.d(TAG, "allocate:: fail to mmap", e);
            return null;
        }
    } else {
        return heap.allocate(size);
    }
}

public static void forceFreeMsgArray(zmq.Msg[] msgs) {
    if (msgs != null) {
        for (zmq.Msg msg : msgs) {
            final ByteBuffer buf = msg.buf();
            if (buf.isDirect()) {
                try {
                    UtilsJNI.nativeUnmapMemory(buf);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

本机 C++ 代码:

extern "C"
JNIEXPORT jobject
UtilsJNI_nativeMapMemory(JNIEnv *env, jobject clazz, jlong size,
                                              jint protection, jint mapFlags) {
    void *pAddr = mmap64(NULL, size, protection, mapFlags|MAP_ANONYMOUS, -1, 0);
    if (pAddr == NULL) {
        env->ThrowNew(env->FindClass("java/io/IOException"), "fail to mmap");
        return NULL;
    } else {
        return env->NewDirectByteBuffer(pAddr, size);
    }
}

extern "C"
JNIEXPORT void
UtilsJNI_nativeUnmapMemory(JNIEnv *env, jobject clazz, jobject buffer) {
    if (munmap(env->GetDirectBufferAddress(buffer), env->GetDirectBufferCapacity(buffer)) < 0) {
        env->ThrowNew(env->FindClass("java/io/IOException"), "fail to munmap");
    }
}