为什么向基于 epoll 的网络爬虫添加 MySQL 代码会减慢它的速度?

Why would adding MySQL code to an epoll based web crawler slow it down so much?

我有一个使用 libcurl 和 epoll 来抓取网站的工作代码:

https://github.com/JamesRead5737/libcurlmemoryleak/blob/master/crawler.c

典型输出为:

Parsed sites: 0, 1024 parallel connections, 10989 still running, 10989 transfers    Exiting normally.
Parsed sites: 0, 0 parallel connections, 0 still running, 0 transfersrsrFinished all in progress downloads.
Exiting.

如您所见,代码达到了 1024 个并行连接的硬编码限制。这在我的专用服务器上最多消耗 3 Gbps。

像这样添加 Mysql 代码:

#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/timerfd.h>
#include <sys/types.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <netdb.h>
#include <time.h>
#include <netinet/in.h>
#include <mysql.h> 
#include <curl/curl.h>

#define MSG_OUT stdout
#define DEFAULT_QUEUE_LENGTH 10000
#define mycase(code) \
        case code: s = __STRING(code)

#define MAX_CONNECTIONS 1024

MYSQL *mysql_con;

/* Global information, common to all connections */
typedef struct _GlobalInfo
{
    int epfd;    /* epoll filedescriptor */
    int tfd;     /* timer filedescriptor */
    CURLM *multi;
    int still_running;
    pthread_mutex_t lock;
    int concurrent_connections;
    pthread_mutex_t parsed_lock;
    int parsed_sites;
    int transfers;
} GlobalInfo;

int new_body_conn(char *url, GlobalInfo *g);

/* Information associated with a specific easy handle */
typedef struct _ConnInfo
{
    CURL *easy;
    char *url;
    GlobalInfo *global;
    char error[CURL_ERROR_SIZE];
    size_t size;
    char *data;
} ConnInfo;

/* Information associated with a specific socket */
typedef struct _SockInfo
{
    curl_socket_t sockfd;
    CURL *easy;
    int action;
    long timeout;
    GlobalInfo *global;
} SockInfo;

void
mysql_stop()
{
    mysql_close(mysql_con);
}

void
mysql_start()
{
    mysql_con = mysql_init(NULL);
    if (mysql_con == NULL)
    {
        fprintf(stderr, "%s\n", mysql_error(mysql_con));
        exit(1);
    }

    if (mysql_real_connect(mysql_con, "localhost", "crawler", "password", "crawl", 0, NULL, 0) == NULL)
    {
        fprintf(stderr, "%s\n", mysql_error(mysql_con));
        exit(EXIT_FAILURE);
    }

    if (mysql_query(mysql_con, "CREATE TABLE IF NOT EXISTS `frontier` (`url` varchar(2084) NOT NULL, `id` int NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), UNIQUE KEY `url` (`url`), KEY `url_2` (`url`)) ENGINE=InnoDB AUTO_INCREMENT=1"))
    {
        fprintf(stderr, "%s\n", mysql_error(mysql_con));
        mysql_stop();
                exit(1);
    }

    if (mysql_query(mysql_con, "CREATE TABLE IF NOT EXISTS `crawled` (`id` int NOT NULL AUTO_INCREMENT, `url` varchar(2084) DEFAULT NULL, `title` varchar(768) DEFAULT NULL, `date` varchar(128) DEFAULT NULL, `last_modified` varchar(128) DEFAULT NULL, `links` int DEFAULT NULL, `backlinks` int DEFAULT NULL, `http_code` int DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `url` (`url`), KEY `http_code` (`http_code`), KEY `title` (`title`)) ENGINE=InnoDB AUTO_INCREMENT=1"))
    {
        fprintf(stderr, "%s\n", mysql_error(mysql_con));
                mysql_stop();
                exit(1);
    }

    if (mysql_query(mysql_con, "CREATE TABLE IF NOT EXISTS `emails` (`email` varchar(2084) NOT NULL, `id` int NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), UNIQUE KEY `email` (`email`), KEY `email_2` (`email`)) ENGINE=InnoDB AUTO_INCREMENT=737 DEFAULT CHARSET=latin1"))
    {
        fprintf(stderr, "%s\n", mysql_error(mysql_con));
                mysql_stop();
                exit(1);
    }
}

void
mysql_url_visited_push(char *url, char *title)
{
    char sql[8192];
    char escaped_url[(strlen(url)*2)+1];
    if (!mysql_real_escape_string(mysql_con, escaped_url, url, strlen(url)))
    {
    }
    if (title != NULL)
    {
        char escaped_title[(strlen(title)*2)+1];
        if (!mysql_real_escape_string(mysql_con, escaped_title, title, strlen(title)))
        {
        }
        sprintf(sql, "INSERT INTO crawled (url, title) VALUES ('%s', '%s')", escaped_url, escaped_title);
    } else {
        sprintf(sql, "INSERT INTO crawled (url, title) VALUES ('%s', '%s')", escaped_url, title);
    }
    if (mysql_query(mysql_con, sql))
    {
        fprintf(stderr, "mysql_url_visited_push sql=%s %s\n", sql, mysql_error(mysql_con));
    }
}

int
mysql_url_visited_find(char *url)
{
    char sql[8192];
    char escaped_url[(strlen(url)*2)+1];
    if (!mysql_real_escape_string(mysql_con, escaped_url, url, strlen(url)))
    {
    }
    sprintf(sql, "SELECT * FROM crawled WHERE url = '%s'", escaped_url);
    if (mysql_query(mysql_con, sql))
    {
        fprintf(stderr, "mysql_url_visited_find sql=%s %s\n", sql, mysql_error(mysql_con));
    } else {
        MYSQL_RES *result = mysql_store_result(mysql_con);
        if (!result)
        {
            fprintf(stderr, "%s\n", mysql_error(mysql_con));
        } else if (result > 0){
            mysql_free_result(result);
            return 1;
        } else if (result == 0) {
            mysql_free_result(result);
            return 0;
        }
    }
}

int
mysql_url_frontier_find(char *url)
{
    char sql[8192];
    char escaped_url[(strlen(url)*2)+1];
    if (!mysql_real_escape_string(mysql_con, escaped_url, url, strlen(url)))
    {
    }
        sprintf(sql, "SELECT * FROM frontier WHERE url = '%s'", escaped_url);
        if (mysql_query(mysql_con, sql))
        {
                fprintf(stderr, "mysql_url_frontier_find sql=%s %s\n", sql, mysql_error(mysql_con));
        } else {
                MYSQL_RES *result = mysql_store_result(mysql_con);      
                if (!result)
                {
                        fprintf(stderr, "%s\n", mysql_error(mysql_con));
                } else if (result > 0){
            mysql_free_result(result);
                        return 1;
                } else if (result == 0) {
            mysql_free_result(result);
                        return 0;
                }
        }
}

void 
mysql_url_frontier_push(char *url)
{
    char sql[8192];
    char escaped_url[(strlen(url)*2)+1];
    if (!mysql_real_escape_string(mysql_con, escaped_url, url, strlen(url)))
    {
    }
        sprintf(sql, "INSERT IGNORE INTO frontier (url) VALUES ('%s')", escaped_url);
        if (mysql_query(mysql_con, sql))
        {
                fprintf(stderr, "mysql_url_frontier_push sql=%s %s\n", sql, mysql_error(mysql_con));
        }
}

char *
mysql_url_frontier_pop()
{
    char *url = NULL;
    char sql[8192];
    if (mysql_query(mysql_con, "SELECT url FROM frontier ORDER BY id") == 0)
    {
        MYSQL_ROW row;
        MYSQL_RES *result = mysql_store_result(mysql_con);
        if (result == NULL)
        {
            fprintf(stderr, "mysql_url_frontier_pop mysql_store_result sql=%s %s\n", sql, mysql_error(mysql_con));
            exit(EXIT_FAILURE);
        }
        if ((row = mysql_fetch_row(result)))
        {
            url = strdup(row[0]);
            char escaped_url[(strlen(url)*2)+1];
            if (!mysql_real_escape_string(mysql_con, escaped_url, url, strlen(url)))
            {
            }
            sprintf( sql, "DELETE FROM frontier WHERE url = '%s'", escaped_url);
            if (mysql_query(mysql_con, sql))
            {
                fprintf(stderr, "mysql_url_frontier_pop mysql_query sql=%s %s\n", sql, mysql_error(mysql_con));
                exit(EXIT_FAILURE);
            }
        }
        mysql_free_result(result);
    }
    return url;
}

int
starts_with(const char *str, const char *pre)
{
        size_t lenstr;
        size_t lenpre;

        if (str == NULL || pre == NULL)
                return (-1);

        lenstr = strlen(str);
        lenpre = strlen(pre);

        if (lenstr < lenpre)
                return (-1);

    return (memcmp(pre, str, lenpre));
}

char *
url_sanitize(char *base_url, char *url, int size)
{
        char *newurl;
        int base_url_len = strlen(base_url);

        if (starts_with(url, "http") == 0) {
                newurl = malloc(size+1);
                if (newurl == NULL) {
                        fprintf(stderr, "1 malloc() of %d bytes, failed\n", size);
                        exit(1);
                }

                strncpy(newurl, url, size);
                newurl[size] = '[=12=]';

        } else {
                if (starts_with(url, "//") == 0) {
                        newurl = malloc(size+7);
                        if (newurl == NULL) {
                                fprintf(stderr, "2 malloc() of %d bytes, failed\n", size);
                                exit(1);
                        }

                        strncpy(newurl, "https:", 6);
                        strncpy(newurl+6, url, size);
                        newurl[size+6] = '[=12=]';
                } else {
                        newurl = malloc(base_url_len + size + 2);
                        if (newurl == NULL) {
                                fprintf(stderr, "3 malloc() of %d bytes, failed\n", size);
                                exit(1);
                        }

                        strncpy(newurl, base_url, base_url_len);
                        strncpy(newurl + base_url_len, url, size);
                        newurl[size + base_url_len] = '[=12=]';
                }
        }

        return (newurl);
}

char *
html_title_find(char *html)
{
        char *newurl, *first, *last;
        int size = 0;

        first = strstr(html, "<title>");
        if (first == NULL)
                return (NULL);

        first += strlen("<title>");

        last = strstr(first, "</title>");
        if (last == NULL)
                return (NULL);

        size = last - first;

        newurl = malloc(size+1);
        if (newurl == NULL) {
                fprintf(stderr, "4 malloc() of %d bytes, failed\n", size);
                exit(1);
        }

        strncpy(newurl, first, size);
        newurl[size] = '[=12=]';

        return (newurl);
}

void
html_link_find(char *url, char *html)
{
        char *first, *last, *newurl;
        int size = 0;

        first = html;

        while (first && last) {
                        first = strstr(first, "href=\"");
                        if (first == NULL)
                                continue;

                        first += strlen("href=\"");

                        last = strchr(first, '\"');
                        if (last == NULL)
                                continue;

                        size = last - first;

                        newurl = url_sanitize(url, first, size);

                        if (strstr(newurl, "mailto")) {
                                free(newurl);
                                continue;
                        } else {
                if (mysql_url_visited_find(newurl) && mysql_url_frontier_find(newurl)) {
                    mysql_url_frontier_push(newurl);
                }
                free(newurl);
            }

        }
}

void
parsed_sites_inc(GlobalInfo *g)
{
        g->parsed_sites++;
}

void
html_parse(char *url, char *html)
{
    char *title;

    title = html_title_find(html);
    html_link_find(url, html);
    mysql_url_visited_push(url, title);

    free(title);
}

/* Die if we get a bad CURLMcode somewhere */ 
static void
mcode_or_die(const char *where, CURLMcode code)
{
    if (CURLM_OK != code) {
        const char *s;

        switch (code) {
            mycase(CURLM_BAD_HANDLE); break;
            mycase(CURLM_BAD_EASY_HANDLE); break;
            mycase(CURLM_OUT_OF_MEMORY); break;
            mycase(CURLM_INTERNAL_ERROR); break;
            mycase(CURLM_UNKNOWN_OPTION); break;
            mycase(CURLM_LAST); break;
            default: s = "CURLM_unknown"; break;
            mycase(CURLM_BAD_SOCKET);
            fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
            /* ignore this error */ 
            return;
        }

        fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
        exit(code);
    }
}

void
print_progress(GlobalInfo *g)
{
    printf("\rParsed sites: %d, %d parallel connections, %d still running, %d transfers\t", 
            g->parsed_sites, g->concurrent_connections, g->still_running, g->transfers);
    fflush(stdout);
}

void
transfers_inc(GlobalInfo *g)
{
    g->transfers++;

    print_progress(g);
}

void
transfers_dec(GlobalInfo *g)
{
    g->transfers--;

    print_progress(g);
}

void
concurrent_connections_inc(GlobalInfo *g)
{
    g->concurrent_connections++;

    print_progress(g);
}

void
concurrent_connections_dec(GlobalInfo *g)
{
    g->concurrent_connections--;

    print_progress(g);
}

static void timer_cb(GlobalInfo* g, int revents);

/* Update the timer after curl_multi library does it's thing. Curl will
 * inform us through this callback what it wants the new timeout to be,
 * after it does some work. */ 
static int
multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
{
    struct itimerspec its;

    //fprintf(MSG_OUT, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);

    if (timeout_ms > 0) {
        its.it_interval.tv_sec = 1;
        its.it_interval.tv_nsec = 0;
        its.it_value.tv_sec = timeout_ms / 1000;
        its.it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000;
    } else if(timeout_ms == 0) {
        /* libcurl wants us to timeout now, however setting both fields of
         * new_value.it_value to zero disarms the timer. The closest we can
         * do is to schedule the timer to fire in 1 ns. */ 
        its.it_interval.tv_sec = 1;
        its.it_interval.tv_nsec = 0;
        its.it_value.tv_sec = 0;
        its.it_value.tv_nsec = 1;
    } else {
        memset(&its, 0, sizeof(struct itimerspec));
    }

    timerfd_settime(g->tfd, /*flags=*/ 0, &its, NULL);

    return (0);
}

/* Check for completed transfers, and remove their easy handles */ 
static void
check_multi_info(GlobalInfo *g)
{
    char *eff_url;
    CURLMsg *msg;
    int msgs_left;
    ConnInfo *conn;
    CURL *easy;
    char *ct;
    double time;
    double dl;
    long header_size;
    long response_code;
    //CURLcode res;

    while ((msg = curl_multi_info_read(g->multi, &msgs_left))) {
        if (msg->msg == CURLMSG_DONE) {
            easy = msg->easy_handle;
            //res = msg->data.result;
            curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
            curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
            curl_easy_getinfo(easy, CURLINFO_CONTENT_TYPE, &ct);
            curl_easy_getinfo(easy, CURLINFO_TOTAL_TIME, &time);
            curl_easy_getinfo(easy, CURLINFO_SIZE_DOWNLOAD, &dl);
            curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &response_code);
            curl_easy_getinfo(easy, CURLINFO_HEADER_SIZE, &header_size);

            if (response_code == 200 && dl == 0.0 && (starts_with(ct, "text/html") || starts_with(ct, "text/plain")))
            {
                /* This should be a response to our HEAD request */
                //printf("200 %s header size: %ld download size: %f", eff_url, header_size, dl);
                new_body_conn(eff_url, g);

            } else if (response_code == 200 && dl > 0.0 && (starts_with(ct, "text/html") || starts_with(ct, "text/plain"))){
                /* This should be a response to our GET request */
                //printf("%ld %s download size: %f content type: %s\n", response_code, eff_url, dl, ct);
                html_parse(eff_url, conn->data);
                parsed_sites_inc(g);
            }
            //fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);

            curl_multi_remove_handle(g->multi, easy);
            //free(conn->url);
            free(conn->data);
            curl_easy_cleanup(easy);
            transfers_dec(g);
            free(conn);
        }
    }
}

/* Called by libevent when we get action on a multi socket filedescriptor*/ 
static void
event_cb(GlobalInfo *g, int fd, int revents)
{
    CURLMcode rc;
    struct itimerspec its;

    int action = ((revents & EPOLLIN) ? CURL_CSELECT_IN : 0) |
                 ((revents & EPOLLOUT) ? CURL_CSELECT_OUT : 0);

    rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
    mcode_or_die("event_cb: curl_multi_socket_action", rc);

    check_multi_info(g);

    if (g->still_running <= 0) {
        //fprintf(MSG_OUT, "last transfer done, kill timeout\n");
        memset(&its, 0, sizeof(struct itimerspec));
        timerfd_settime(g->tfd, 0, &its, NULL);
    }
}

/* Called by main loop when our timeout expires */ 
static void
timer_cb(GlobalInfo* g, int revents)
{
    CURLMcode rc;
    uint64_t count = 0;
    ssize_t err = 0;

    err = read(g->tfd, &count, sizeof(uint64_t));
    if (err == -1) {
        /* Note that we may call the timer callback even if the timerfd isn't
         * readable. It's possible that there are multiple events stored in the
         * epoll buffer (i.e. the timer may have fired multiple times). The
         * event count is cleared after the first call so future events in the
         * epoll buffer will fail to read from the timer. */ 
        if (errno == EAGAIN) {
            //fprintf(MSG_OUT, "EAGAIN on tfd %d\n", g->tfd);
            return;
        }
    }

    if (err != sizeof(uint64_t)) {
        fprintf(stderr, "read(tfd) == %ld", err);
        perror("read(tfd)");
    }

    rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
    mcode_or_die("timer_cb: curl_multi_socket_action", rc);
    check_multi_info(g);
}

/* Assign information to a SockInfo structure */ 
static void
setsock(SockInfo *f, curl_socket_t s, CURL *e, int act, GlobalInfo *g)
{
    struct epoll_event ev;
    int kind = ((act & CURL_POLL_IN) ? EPOLLIN : 0) |
               ((act & CURL_POLL_OUT) ? EPOLLOUT : 0);

    if (f->sockfd) {
        concurrent_connections_dec(g);
        if (epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
            fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
              f->sockfd, strerror(errno));
    }

    f->sockfd = s;
    f->action = act;
    f->easy = e;

    ev.events = kind;
    ev.data.fd = s;

    concurrent_connections_inc(g);
    if (epoll_ctl(g->epfd, EPOLL_CTL_ADD, s, &ev)) {
        fprintf(stderr, "EPOLL_CTL_ADD failed for fd: %d : %s\n",
          s, strerror(errno));
    }
}

/* Initialize a new SockInfo structure */ 
static void
addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
{
    SockInfo *fdp = (SockInfo *)calloc(sizeof(SockInfo), 1);

    fdp->global = g;
    setsock(fdp, s, easy, action, g);
    curl_multi_assign(g->multi, s, fdp);
}

static size_t
write_cb(void *contents, size_t size, size_t nmemb, void *p)
{
    ConnInfo *conn = (ConnInfo *)p;
    size_t realsize = size * nmemb;

    conn->data = realloc(conn->data, conn->size + realsize + 1);
    if (conn->data == NULL) {
        /* out of memory! */ 
        printf("not enough memory (realloc returned NULL)\n");
        return 0;
    }

    memcpy(&(conn->data[conn->size]), contents, realsize);
    conn->size += realsize;
    conn->data[conn->size] = 0;

    return realsize;
}

/* Create a new easy handle, and add it to the global curl_multi */ 
int
new_head_conn(char *url, GlobalInfo *g)
{
    ConnInfo *conn;
    CURLMcode rc;

    conn = (ConnInfo*)calloc(1, sizeof(ConnInfo));
    conn->error[0]='[=12=]';
    conn->global = g;

    conn->easy = curl_easy_init();
    if (!conn->easy) {
        fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
        exit(2);
    }
    transfers_inc(g);

    conn->global = g;
    conn->url = url;
    curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
    curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
    curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
    curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
    curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
    curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
    curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 1L);
    curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
    curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
    curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
    curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 100L);
    curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 10L);
    curl_easy_setopt(conn->easy, CURLOPT_CLOSESOCKETDATA, g);
    curl_easy_setopt(conn->easy, CURLOPT_NOBODY, 1L);

    rc = curl_multi_add_handle(g->multi, conn->easy);
    mcode_or_die("new_conn: curl_multi_add_handle", rc);

    /* note that the add_handle() will set a time-out to trigger very soon so
     that the necessary socket_action() call will be called by this app */ 

    return (0);
}

/* Create a new easy handle, and add it to the global curl_multi */
int
new_body_conn(char *url, GlobalInfo *g)
{
        ConnInfo *conn;
        CURLMcode rc;

        conn = (ConnInfo*)calloc(1, sizeof(ConnInfo));
        conn->error[0]='[=12=]';
        conn->global = g;

        conn->easy = curl_easy_init();
        if (!conn->easy) {
                fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
                exit(2);
        }
        transfers_inc(g);

        conn->global = g;
        conn->url = url;
        curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
        curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
        curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
        curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
        curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
        curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
        curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 1L);
        curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
        curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
        curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
        curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 100L);
        curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 10L);
        curl_easy_setopt(conn->easy, CURLOPT_CLOSESOCKETDATA, g);

        rc = curl_multi_add_handle(g->multi, conn->easy);
        mcode_or_die("new_conn: curl_multi_add_handle", rc);

        /* note that the add_handle() will set a time-out to trigger very soon so
     that the necessary socket_action() call will be called by this app */

        return (0);
}

/* Clean up the SockInfo structure */ 
static void
remsock(SockInfo *f, GlobalInfo* g)
{
    if (f) {
        if (f->sockfd) {
            concurrent_connections_dec(g);
            if (epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
                fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
                  f->sockfd, strerror(errno));
        }

        free(f);
    }
}

/* CURLMOPT_SOCKETFUNCTION */ 
static int
sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
{
    GlobalInfo *g = (GlobalInfo*) cbp;
    SockInfo *fdp = (SockInfo*) sockp;

    if (what == CURL_POLL_REMOVE) {
        remsock(fdp, g);
    } else {
        if (g->concurrent_connections < MAX_CONNECTIONS){
            if (!fdp) {
                addsock(s, e, what, g);
            } else {
                setsock(fdp, s, e, what, g);
            }
        }
    }

    return (0);
}

/* CURLMOPT_SOCKETFUNCTION */
static int
end_sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
{
        GlobalInfo *g = (GlobalInfo*) cbp;
        SockInfo *fdp = (SockInfo*) sockp;

        if (what == CURL_POLL_REMOVE) {
                remsock(fdp, g);
        }

        return (0);
}


int should_exit = 0;

void
signal_handler(int signo)
{
    should_exit = 1;
}

void *
crawler_init()
{
    GlobalInfo g;
    struct itimerspec its;
    struct epoll_event ev;
    struct epoll_event events[10000];


    memset(&g, 0, sizeof(GlobalInfo));

    g.transfers = 0;
    g.parsed_sites = 0;

    g.epfd = epoll_create1(EPOLL_CLOEXEC);
    if (g.epfd == -1) {
        perror("epoll_create1 failed\n");
        exit(1);
    }

    g.tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
    if (g.tfd == -1) {
        perror("timerfd_create failed\n");
        exit(1);
    }

    memset(&its, 0, sizeof(struct itimerspec));
    its.it_interval.tv_sec = 1;
    its.it_value.tv_sec = 1;
    timerfd_settime(g.tfd, 0, &its, NULL);

    ev.events = EPOLLIN;
    ev.data.fd = g.tfd;
    epoll_ctl(g.epfd, EPOLL_CTL_ADD, g.tfd, &ev);

    curl_global_init(CURL_GLOBAL_DEFAULT);
    g.multi = curl_multi_init();

    /* setup the generic multi interface options we want */ 
    curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
    curl_multi_setopt(g.multi, CURLMOPT_SOCKETDATA, &g);
    curl_multi_setopt(g.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
    curl_multi_setopt(g.multi, CURLMOPT_TIMERDATA, &g);

    /* we don't call any curl_multi_socket*() function yet as we have no handles added! */ 

    //printf("Starting crawler...\n");

    while (!should_exit) {
        int idx;
        int err = epoll_wait(g.epfd, events, sizeof(events)/sizeof(struct epoll_event), 10000);
        char *url;

        url = mysql_url_frontier_pop();

        new_head_conn(url, &g);

        if (err == -1) {
            if (errno == EINTR) {
                fprintf(MSG_OUT, "note: wait interrupted\n");
                continue;
            } else {
                perror("epoll_wait");
                exit(1);
            }
        }

        for (idx = 0; idx < err; ++idx) {
            if (events[idx].data.fd == g.tfd) {
                timer_cb(&g, events[idx].events);
            } else {
                event_cb(&g, events[idx].data.fd, events[idx].events);
            }
        }
    }

    fprintf(MSG_OUT, "Exiting normally.\n");
    fflush(MSG_OUT);

    curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, end_sock_cb);
    while (g.concurrent_connections > 0 || g.transfers > 0)
    {
        int idx;
                int err = epoll_wait(g.epfd, events, sizeof(events)/sizeof(struct epoll_event), 10000);

                if (err == -1) {
                        if (errno == EINTR) {
                                fprintf(MSG_OUT, "note: wait interrupted\n");
                                continue;
                        } else {
                                perror("epoll_wait");
                                exit(1);
                        }
                }

                for (idx = 0; idx < err; ++idx) {
                        if (events[idx].data.fd == g.tfd) {
                                timer_cb(&g, events[idx].events);
                        } else {
                                event_cb(&g, events[idx].data.fd, events[idx].events);
                        }
                }

    }

    fprintf(MSG_OUT, "Finished all in progress downloads.\n");
    fflush(MSG_OUT);

    curl_multi_cleanup(g.multi);
    curl_global_cleanup();

    return (NULL);
}

int
main(int argc, char **argv)
{
    int cleanup = 0, opt, ret;

    should_exit = 0;
    signal(SIGINT, signal_handler);
    signal(SIGKILL, signal_handler);

    mysql_start();
    crawler_init();
    mysql_stop();


    printf("Exiting.\n");

    return (0);
}

典型输出为:

Parsed sites: 42, 6 parallel connections, 4 still running, 6 transfersssExiting normally.
Parsed sites: 48, 0 parallel connections, 0 still running, 0 transfers  Finished all in progress downloads.
Exiting.

如您所见,该应用并未达到硬编码的最大并行连接数。差远了。而且它只消耗大约 3 Mbps 的带宽。

为什么?我不明白为什么仅添加 MySQL 代码会导致程序无法 运行 发挥其全部潜力。无法达到硬编码的最大并行连接数。

知道这段代码有什么问题吗?

编辑

这是一些 gprof 输出:

Flat profile:

Each sample counts as 0.01 seconds.
  %   cumulative   self              self     total           
 time   seconds   seconds    calls  ms/call  ms/call  name    
 44.49      0.04     0.04    12668     0.00     0.00  mysql_url_visited_find
 33.37      0.07     0.03    12668     0.00     0.00  mysql_url_frontier_push
 11.12      0.08     0.01    12668     0.00     0.00  mysql_url_frontier_find
 11.12      0.09     0.01      100     0.10     0.90  html_link_find
  0.00      0.09     0.00    17355     0.00     0.00  starts_with
  0.00      0.09     0.00    12669     0.00     0.00  url_sanitize
  0.00      0.09     0.00     2651     0.00     0.00  mcode_or_die
  0.00      0.09     0.00     2432     0.00     0.04  check_multi_info
  0.00      0.09     0.00     2420     0.00     0.04  event_cb
  0.00      0.09     0.00     1288     0.00     0.00  print_progress
  0.00      0.09     0.00      425     0.00     0.00  concurrent_connections_dec
  0.00      0.09     0.00      425     0.00     0.00  concurrent_connections_inc
  0.00      0.09     0.00      425     0.00     0.00  setsock
  0.00      0.09     0.00      303     0.00     0.00  remsock
  0.00      0.09     0.00      299     0.00     0.00  addsock
  0.00      0.09     0.00      219     0.00     0.00  transfers_dec
  0.00      0.09     0.00      219     0.00     0.00  transfers_inc
  0.00      0.09     0.00      116     0.00     0.00  mysql_url_frontier_pop
  0.00      0.09     0.00      116     0.00     0.00  new_head_conn
  0.00      0.09     0.00      103     0.00     0.00  new_body_conn
  0.00      0.09     0.00      100     0.00     0.90  html_parse
  0.00      0.09     0.00      100     0.00     0.00  html_title_find
  0.00      0.09     0.00      100     0.00     0.00  mysql_url_visited_push
  0.00      0.09     0.00      100     0.00     0.00  parsed_sites_inc
  0.00      0.09     0.00       32     0.00     0.01  timer_cb
  0.00      0.09     0.00        1     0.00    90.09  crawler_init
  0.00      0.09     0.00        1     0.00     0.00  mysql_start
  0.00      0.09     0.00        1     0.00     0.00  mysql_stop

您的所有 mysql 代码都是同步的 — 在您等待 mysql 响应查询或确认插入时,您的事件循环没有做任何事情,并且您花费了 90您在 mysql 相关功能中花费的时间百分比。您没有接近并发连接限制,因为数据库瓶颈限制了您发出新 HTTP 请求的速率,并且在您等待D B。

解决这个问题并不简单,但有一些一般性建议:

  1. 尽可能避免与数据库对话。 'frontier' 内容看起来像是可以保持在进程中的状态。

  2. 使用异步库与数据库交互并将其集成到您的事件循环中,这样您就可以在数据库查询未完成的情况下继续其他工作。

  3. 确保你的数据库有必要的索引,或者考虑使用像 redis 这样的东西而不是 mysql,其中访问模式更明确,简单的访问非常快,你可以'真的 "accidentally" 写了一个性能很差的查询。