可读节点流到本机 c++ 插件 InputStream

Readable node stream to native c++ addon InputStream

从概念上讲,我要做的事情非常简单。我在节点中有一个 Readable 流,我将它传递给一个本机 c++ 插件,我想将它连接到一个 IInputStream.

我正在使用的本机库与我见过的许多 c++(或 Java)流接口一样工作。该库提供了一个 IInputStream 接口(技术上是一个抽象的 class),我继承并重写了虚函数。看起来像这样:

class JsReadable2InputStream : public IInputStream {

  public:

    // Constructor takes a js v8 object, makes a stream out of it
    JsReadable2InputStream(const v8::Local<v8::Object>& streamObj);
    ~JsReadable2InputStream();

    /**
     * Blocking read. Blocks until the requested amount of data has been read. However, 
     * if the stream reaches its end before the requested amount of bytes has been read
     * it returns the number of bytes read thus far.
     *
     * @param begin      memory into which read data is copied
     * @param byteCount  the requested number of bytes
     * @return           the number of bytes actually read. Is less than bytesCount iff
     *                   end of stream has been reached.
     */
    virtual int read(char* begin, const int byteCount) override;
    virtual int available() const override; 
    virtual bool isActive() const override; 
    virtual void close() override;

  private:
    Nan::Persistent<v8::Object> _stream;
    bool _active;
    JsEventLoopSync _evtLoop;
};

在这些函数中,这里最重要的是read。本机库将在需要更多数据时调用此函数,并且该函数必须阻塞,直到它能够 return 请求的数据(或流结束)。这是我对 read:

的实现
int JsReadable2InputStream::read(char* begin, const int byteCount) {
  if (!this->_active) { return 0; }
  int read = -1;
  while (read < 0 && this->_active) {
    this->_evtLoop.invoke(
      (voidLambda)[this,&read,begin,byteCount](){
        v8::Local<v8::Object> stream = Nan::New(this->_stream);
        const v8::Local<v8::Function> readFn = Nan::To<v8::Function>(Nan::Get(stream, JS_STR("read")).ToLocalChecked()).ToLocalChecked();
        v8::Local<v8::Value> argv[] = { Nan::New<v8::Number>(byteCount) };
        v8::Local<v8::Value> result = Nan::Call(readFn, stream, 1, argv).ToLocalChecked();
        if (result->IsNull()) {
          // Somewhat hacky/brittle way to check if stream has ended, but it's the only option
          v8::Local<v8::Object> readableState = Nan::To<v8::Object>(Nan::Get(stream, JS_STR("_readableState")).ToLocalChecked()).ToLocalChecked();
          if (Nan::To<bool>(Nan::Get(readableState, JS_STR("ended")).ToLocalChecked()).ToChecked()) {
            // End of stream, all data has been read
            this->_active = false;
            read = 0;
            return;
          }
          // Not enough data available, but stream is still open.
          // Set a flag for the c++ thread to go to sleep
          // This is the case that it gets stuck in
          read = -1;
          return;
        }
        v8::Local<v8::Object> bufferObj = Nan::To<v8::Object>(result).ToLocalChecked();
        int len = Nan::To<int32_t>(Nan::Get(bufferObj, JS_STR("length")).ToLocalChecked()).ToChecked();
        char* buffer = node::Buffer::Data(bufferObj);

        if (len < byteCount) {
          this->_active = false;
        }

        // copy the data out of the buffer
        if (len > 0) {
          std::memcpy(begin, buffer, len);
        }
        read = len;
      }
    );
    if (read < 0) {
      // Give js a chance to read more data
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
  }
  return read;
}

想法是,C++ 代码保留对节点流对象的引用。当本机代码想要读取时,它必须与节点事件循环同步,然后尝试在节点流上调用 read。如果节点流 returns null,这表明数据还没有准备好,所以本机线程休眠,让节点事件循环线程有机会 运行 并填充它的缓冲区.

此解决方案完美地 用于单个流,甚至 2 或 3 个流 运行 并行。然后出于某种原因,当我达到 4+ 并行流的神奇数量时,这完全陷入僵局。 None 个流可以成功读取任何字节。上面的 while 无限循环 运行s,每次调用节点流 returning null

它表现得好像节点正在挨饿,并且流永远没有机会填充数据。但是,我尝试调整睡眠持续时间(调整为更大的值和随机值)但没有效果。同样清楚的是事件循环继续到 运行,因为我的 lambda 函数继续在那里执行(我在里面放了一些 printfs 来确认这一点)。

以防万一它可能相关(我不认为它是相关的),我也包括了我对 JsEventLoopSync 的实现。这使用 libuv 来安排要在节点事件循环上执行的 lambda。它的设计使得一次只能安排一个,其他调用必须等到第一个完成。

#include <nan.h>
#include <functional>

// simplified type declarations for the lambda functions
using voidLambda = std::function<void ()>;

// Synchronize with the node v8 event loop. Invokes a lambda function on the event loop, where access to js objects is safe.
// Blocks execution of the invoking thread until execution of the lambda completes.
class JsEventLoopSync {
  public:
    JsEventLoopSync() : _destroyed(false) {
      // register on the default (same as node) event loop, so that we can execute callbacks in that context
      // This takes a function pointer, which only works with a static function
      this->_handles = new async_handles_t();
      this->_handles->inst = this;
      uv_async_init(uv_default_loop(), &this->_handles->async, JsEventLoopSync::_processUvCb);

      // mechanism for passing this instance through to the native uv callback
      this->_handles->async.data = this->_handles;

      // mutex has to be initialized
      uv_mutex_init(&this->_handles->mutex);
      uv_cond_init(&this->_handles->cond);
    }

    ~JsEventLoopSync() {
      uv_mutex_lock(&this->_handles->mutex);
      // prevent access to deleted instance by callback
      this->_destroyed = true;
      uv_mutex_unlock(&this->_handles->mutex);
      // NOTE: Important, this->_handles must be a dynamically allocated pointer because uv_close() is
      // async, and still has a reference to it. If it were statically allocated as a class member, this
      // destructor would free the memory before uv_close was done with it (leading to asserts in libuv)
      uv_close(reinterpret_cast<uv_handle_t*>(&this->_handles->async), JsEventLoopSync::_asyncClose);
    }

    // called from the native code to invoke the function
    void invoke(const voidLambda& fn) {
      if (v8::Isolate::GetCurrent() != NULL) {
        // Already on the event loop, process now
        return fn();
      }
      // Need to sync with the event loop
      uv_mutex_lock(&this->_handles->mutex);
      if (this->_destroyed) { return; }
      this->_fn = fn;
      // this will invoke processUvCb, on the node event loop
      uv_async_send(&this->_handles->async);
      // wait for it to complete processing
      uv_cond_wait(&this->_handles->cond, &this->_handles->mutex);
      uv_mutex_unlock(&this->_handles->mutex);
    }

  private:

    // pulls data out of uv's void* to call the instance method
    static void _processUvCb(uv_async_t* handle) {
      if (handle->data == NULL) { return; }
      auto handles = static_cast<async_handles_t*>(handle->data);
      handles->inst->_process();
    }

    inline static void _asyncClose(uv_handle_t* handle) {
      auto handles = static_cast<async_handles_t*>(handle->data);
      handle->data = NULL;
      uv_mutex_destroy(&handles->mutex);
      uv_cond_destroy(&handles->cond);
      delete handles;
    }

    // Creates the js arguments (populated by invoking the lambda), then invokes the js function
    // Invokes resultLambda on the result
    // Must be run on the node event loop!
    void _process() {
      if (v8::Isolate::GetCurrent() == NULL) {
        // This is unexpected!
        throw std::logic_error("Unable to sync with node event loop for callback!");
      }
      uv_mutex_lock(&this->_handles->mutex);
      if (this->_destroyed) { return; }
      Nan::HandleScope scope; // looks unused, but this is very important
      // invoke the lambda
      this->_fn();
      // signal that we're done
      uv_cond_signal(&this->_handles->cond);
      uv_mutex_unlock(&this->_handles->mutex);
    }

    typedef struct async_handles {
      uv_mutex_t mutex;
      uv_cond_t cond;
      uv_async_t async;
      JsEventLoopSync* inst;
    } async_handles_t;

    async_handles_t* _handles;
    voidLambda _fn;
    bool _destroyed;
};

那么,我错过了什么?有没有更好的方法等待节点线程获得运行的机会?是否有一种完全不同的设计模式会更好?节点是否对其一次可以处理的流数量有上限?

事实证明,我看到的问题实际上是客户端限制。浏览器(似乎还有节点)对同一来源的打开 TCP 连接的数量有限制。我通过生成多个节点进程来进行测试来解决这个问题。

如果有人试图做类似的事情,我分享的代码是完全可行的。如果我有空闲时间,我可能会去图书馆。