Ruby C 扩展如何存储 proc 以供以后执行?

How can a Ruby C extension store a proc for later execution?

目标:允许 c 扩展接收 block/proc 延迟执行,同时保留当前执行上下文。

我在 c 中有一个方法(暴露给 ruby)接受 callback(通过 VALUE hash 参数)或 block.

// For brevity, lets assume m_CBYO is setup to make a CBYO module available to ruby
extern VALUE m_CBYO;
VALUE CBYO_add_callback(VALUE callback)
{
    if (rb_block_given_p()) {
        callback = rb_block_proc();
    }

    if (NIL_P(callback)) {
        rb_raise(rb_eArgError, "either a block or callback proc is required");
    }

    // method is called here to add the callback proc to rb_callbacks
}
rb_define_module_function(m_CBYO, "add_callback", CBYO_add_callback, 1);

我有一个结构,我用它来存储这些和一些额外的数据:

struct rb_callback
{
    VALUE rb_cb;
    unsigned long long lastcall;
    struct rb_callback *next;
};
static struct rb_callback *rb_callbacks = NULL;

时间到了(由 epoll 触发),我遍历回调并执行每个回调:

rb_funcall(cb->rb_cb, rb_intern("call"), 0);

发生这种情况时,我看到它成功执行了回调中的 ruby 代码,但是,它正在转义当前执行上下文。

示例:

# From ruby including the above extension
CBYO.add_callback do
    puts "Hey now."
end

loop do
    puts "Waiting for signal..."
    sleep 1
end

收到信号时(通过 epoll)我将看到以下内容:

$> Waiting for signal...
$> Waiting for signal...
$> Hey now.
$> // process hangs
$> // Another signal occurs
$> [BUG] vm_call_cfunc - cfp consistency error

有时,在 bug 再次出现之前,我可以获得多个信号进行处理。

我在调查时找到了答案 a similar issue

事实证明,我也在尝试使用 MRI 不支持的本机线程信号(pthread_create)。

TLDR; Ruby VM 当前(在撰写本文时)不是线程安全的。查看 this nice write-up on Ruby Threading 以更好地全面了解如何在这些范围内工作。

您可以使用Ruby的native_thread_create(rb_thread_t *th) which will use pthread_create behind the scenes. There are some drawbacks that you can read about in the documentation above the method definition. You can then run the callback with Ruby's rb_thread_call_with_gvl方法。另外,我在这里没有这样做,但是创建一个包装器方法可能是个好主意,这样您就可以使用 rb_protect 来处理回调可能引发的异常(否则它们将被 VM 吞没)。

VALUE execute_callback(VALUE callback)
{
    return rb_funcall(callback, rb_intern("call"), 0);
}

// execute the callback when the thread receives signal
rb_thread_call_with_gvl(execute_callback, data->callback);