libcurl 与 macOS Sierra 上的 kevent 异步集成
libcurl async integration with kevent on macOS Sierra
我正在将 curl 集成到基于 kqueue 的异步 I/O 事件循环中。
libcurl 具有出色的 API 功能,可以集成到应用程序事件循环中。
您为 libcurl 提供两个回调,一个用于设置计时器(用于限制 request/connect 次),另一个用于为 read/write/error 事件注册 libcurl 的文件描述符。
用于执行 FD 注册的回调文档在这里:CURLMOPT_SOCKETFUNCTION
通知回调函数 libcurl 感兴趣的事件的参数有四个枚举值:
CURL_POLL_IN
Wait for incoming data. For the socket to become readable.
CURL_POLL_OUT
Wait for outgoing data. For the socket to become writable.
CURL_POLL_INOUT
Wait for incoming and outgoing data. For the socket to become readable or writable.
CURL_POLL_REMOVE
The specified socket/file descriptor is no longer used by libcurl.
虽然没有明确记录,但 libcurl 期望在后续调用回调时更新事件循环的过滤器状态以匹配它传递的内容。即,如果在第一次调用时它通过了 CURL_POLL_IN
(EVFILT_READ
) 并且在随后的调用中它通过了 CURL_POLL_OUT
(EVFILT_WRITE
),那么原始的 EVFILT_READ
过滤器将被移除。
我更新了 FD 注册码来处理这个问题。
int fr_event_fd_insert(fr_event_list_t *el, int fd,
fr_event_fd_handler_t read,
fr_event_fd_handler_t write,
fr_event_fd_handler_t error,
void *ctx)
{
int filter = 0;
struct kevent evset[2];
struct kevent *ev_p = evset;
fr_event_fd_t *ef, find;
if (!el) {
fr_strerror_printf("Invalid argument: NULL event list");
return -1;
}
if (!read && !write) {
fr_strerror_printf("Invalid arguments: NULL read and write callbacks");
return -1;
}
if (fd < 0) {
fr_strerror_printf("Invalid arguments: Bad FD %i", fd);
return -1;
}
if (el->exit) {
fr_strerror_printf("Event loop exiting");
return -1;
}
memset(&find, 0, sizeof(find));
/*
* Get the existing fr_event_fd_t if it exists.
*/
find.fd = fd;
ef = rbtree_finddata(el->fds, &find);
if (!ef) {
ef = talloc_zero(el, fr_event_fd_t);
if (!ef) {
fr_strerror_printf("Out of memory");
return -1;
}
talloc_set_destructor(ef, _fr_event_fd_free);
el->num_fds++;
ef->fd = fd;
rbtree_insert(el->fds, ef);
/*
* Existing filters will be overwritten if there's
* a new filter which takes their place. If there
* is no new filter however, we need to delete the
* existing one.
*/
} else {
if (ef->read && !read) filter |= EVFILT_READ;
if (ef->write && !write) filter |= EVFILT_WRITE;
if (filter) {
EV_SET(ev_p++, ef->fd, filter, EV_DELETE, 0, 0, 0);
filter = 0;
}
/*
* I/O handler may delete an event, then
* re-add it. To avoid deleting modified
* events we unset the do_delete flag.
*/
ef->do_delete = false;
}
ef->ctx = ctx;
if (read) {
ef->read = read;
filter |= EVFILT_READ;
}
if (write) {
ef->write = write;
filter |= EVFILT_WRITE;
}
ef->error = error;
EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
if (kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL) < 0) {
fr_strerror_printf("Failed inserting event for FD %i: %s", fd, fr_syserror(errno));
talloc_free(ef);
return -1;
}
ef->is_registered = true;
return 0;
}
不幸的是,它不起作用。 kevent 似乎没有删除旧过滤器(我们继续收到他们的通知)。
奇怪的是,如果我在两个单独的调用中应用这两个操作,它会完美地工作。
if (filter) {
EV_SET(&evset, ef->fd, filter, EV_DELETE, 0, 0, 0);
kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL);
filter = 0;
}
这是 Sierra 的 kevent 实现中的错误,还是我误解了 kevent 的工作原理?
这里的问题是,您不能 'or' 将 EVFILT_READ
和 EVFILT_WRITE
标志放在一起。
启用或禁用多个过滤器时,您需要在多个 evset
结构上多次调用 EV_SET()
。
上面例子中的非功能代码:
struct kevent evset[2];
struct kevent *ev_p = evset;
if (read) {
ef->read = read;
filter |= EVFILT_READ;
}
if (write) {
ef->write = write;
filter |= EVFILT_WRITE;
}
ef->error = error;
EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
event(el->kq, evset, ev_p - evset, NULL, 0, NULL)
变为:
int count = 0;
struct ev_set[2];
if (read) {
ef->read = read;
EV_SET(ev_set[count++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ef);
}
if (write) {
ef->write = write;
EV_SET(ev_set[count++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ef);
}
ef->error = error;
kevent(el->kq, ev_set, count, NULL, 0, NULL)
进行此更改后,一切都按预期进行。
我正在将 curl 集成到基于 kqueue 的异步 I/O 事件循环中。
libcurl 具有出色的 API 功能,可以集成到应用程序事件循环中。
您为 libcurl 提供两个回调,一个用于设置计时器(用于限制 request/connect 次),另一个用于为 read/write/error 事件注册 libcurl 的文件描述符。
用于执行 FD 注册的回调文档在这里:CURLMOPT_SOCKETFUNCTION
通知回调函数 libcurl 感兴趣的事件的参数有四个枚举值:
CURL_POLL_IN
Wait for incoming data. For the socket to become readable.
CURL_POLL_OUT
Wait for outgoing data. For the socket to become writable.
CURL_POLL_INOUT
Wait for incoming and outgoing data. For the socket to become readable or writable.
CURL_POLL_REMOVE
The specified socket/file descriptor is no longer used by libcurl.
虽然没有明确记录,但 libcurl 期望在后续调用回调时更新事件循环的过滤器状态以匹配它传递的内容。即,如果在第一次调用时它通过了 CURL_POLL_IN
(EVFILT_READ
) 并且在随后的调用中它通过了 CURL_POLL_OUT
(EVFILT_WRITE
),那么原始的 EVFILT_READ
过滤器将被移除。
我更新了 FD 注册码来处理这个问题。
int fr_event_fd_insert(fr_event_list_t *el, int fd,
fr_event_fd_handler_t read,
fr_event_fd_handler_t write,
fr_event_fd_handler_t error,
void *ctx)
{
int filter = 0;
struct kevent evset[2];
struct kevent *ev_p = evset;
fr_event_fd_t *ef, find;
if (!el) {
fr_strerror_printf("Invalid argument: NULL event list");
return -1;
}
if (!read && !write) {
fr_strerror_printf("Invalid arguments: NULL read and write callbacks");
return -1;
}
if (fd < 0) {
fr_strerror_printf("Invalid arguments: Bad FD %i", fd);
return -1;
}
if (el->exit) {
fr_strerror_printf("Event loop exiting");
return -1;
}
memset(&find, 0, sizeof(find));
/*
* Get the existing fr_event_fd_t if it exists.
*/
find.fd = fd;
ef = rbtree_finddata(el->fds, &find);
if (!ef) {
ef = talloc_zero(el, fr_event_fd_t);
if (!ef) {
fr_strerror_printf("Out of memory");
return -1;
}
talloc_set_destructor(ef, _fr_event_fd_free);
el->num_fds++;
ef->fd = fd;
rbtree_insert(el->fds, ef);
/*
* Existing filters will be overwritten if there's
* a new filter which takes their place. If there
* is no new filter however, we need to delete the
* existing one.
*/
} else {
if (ef->read && !read) filter |= EVFILT_READ;
if (ef->write && !write) filter |= EVFILT_WRITE;
if (filter) {
EV_SET(ev_p++, ef->fd, filter, EV_DELETE, 0, 0, 0);
filter = 0;
}
/*
* I/O handler may delete an event, then
* re-add it. To avoid deleting modified
* events we unset the do_delete flag.
*/
ef->do_delete = false;
}
ef->ctx = ctx;
if (read) {
ef->read = read;
filter |= EVFILT_READ;
}
if (write) {
ef->write = write;
filter |= EVFILT_WRITE;
}
ef->error = error;
EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
if (kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL) < 0) {
fr_strerror_printf("Failed inserting event for FD %i: %s", fd, fr_syserror(errno));
talloc_free(ef);
return -1;
}
ef->is_registered = true;
return 0;
}
不幸的是,它不起作用。 kevent 似乎没有删除旧过滤器(我们继续收到他们的通知)。
奇怪的是,如果我在两个单独的调用中应用这两个操作,它会完美地工作。
if (filter) {
EV_SET(&evset, ef->fd, filter, EV_DELETE, 0, 0, 0);
kevent(el->kq, evset, ev_p - evset, NULL, 0, NULL);
filter = 0;
}
这是 Sierra 的 kevent 实现中的错误,还是我误解了 kevent 的工作原理?
这里的问题是,您不能 'or' 将 EVFILT_READ
和 EVFILT_WRITE
标志放在一起。
启用或禁用多个过滤器时,您需要在多个 evset
结构上多次调用 EV_SET()
。
上面例子中的非功能代码:
struct kevent evset[2];
struct kevent *ev_p = evset;
if (read) {
ef->read = read;
filter |= EVFILT_READ;
}
if (write) {
ef->write = write;
filter |= EVFILT_WRITE;
}
ef->error = error;
EV_SET(ev_p++, fd, filter, EV_ADD | EV_ENABLE, 0, 0, ef);
event(el->kq, evset, ev_p - evset, NULL, 0, NULL)
变为:
int count = 0;
struct ev_set[2];
if (read) {
ef->read = read;
EV_SET(ev_set[count++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ef);
}
if (write) {
ef->write = write;
EV_SET(ev_set[count++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ef);
}
ef->error = error;
kevent(el->kq, ev_set, count, NULL, 0, NULL)
进行此更改后,一切都按预期进行。