高效 data/string 从文件 (CSV) c 读取和复制

Efficient data/string reading and copy from file (CSV) c

我已经阅读了有关从文件复制数据的其他帖子。让我告诉你为什么我的情况不同。 使用 C 我必须从一个 csv 文件中读取 4300 万行输入。条目没有错误,格式如下:

int, int , int , variable length string with only alphanumerics and spaces , int \n

事情是我正在将所有数据复制到内存中的数组和列表中以对其进行一些非常非常简单的平均,然后将所有处理过的数据输出到一个文件中,没什么特别的。 我主要在三个方面需要帮助:

  1. 关于字符串,(我最大的问题在这里)它首先从文件中读取,然后复制到一个数组,然后传递给另一个函数,只有在以下情况下才会将其复制到动态内存中满足一个条件。例如:

    fileAnalizer(){
      while ( ! EOF ){
        char * s = function_to_read_string_from_file();
        data_to_array(s);
      }
      ....
      ....
      processData(arrays);
      dataToFiles(arrays);
    
    }
    
    void data_to_structures(char * s){
      if ( condition is met)
        char * aux = malloc((strlen(s)+1 )* sizeof(char));
        strcpy(aux,s);
      ....
      ...
    }
    

    如您所见,字符串遍历了 3 次。我需要一种方法来更有效地执行此过程,减少字符串的传输次数。我尝试逐个读取一个字符并计算字符串长度,但整个过程变慢了。

  2. 高效读取输入:您是否建议首先将所有数据复制到缓冲区中?如果是这样:什么类型的缓冲区,在许多块中还是只有一个? 这是我目前的阅读计划:

    void
    csvReader(FILE* f){
        T_structCDT c;
        c.string = malloc(MAX_STRING_LENGHT*sizeof(char));
        while (fscanf(f,"%d,%d,%d,%[^,],%d\n",&c.a, &c.b, &c.vivienda, c.c, &c.d)==5 ){
            data_to_structures(c);
        }
    }
    
  3. 然后我有将近 50000 行处理过的数据转储到其他文件中。你会如何推荐倾销?逐行或再次将数据发送到缓冲区然后进行转储?我的代码现在看起来像这样。

    void dataToFiles(arrayOfStructs immenseAr1, arrayOfStructs immenseAr2){
      for (iteration over immenseAr1) {
          fprintf(f1, "%d,%d,%s\n", immenseAr1[i].a, immenseAr1[i].b, inmenseAr1[i].string);
      }
      for (iteration over immenseAr2) {
          fprintf(f2, "%d,%d,%s\n", inmenseAr2[i].a, inmenseAr2[i].b, inmenseAr2[i].string);
      }
    }
    

我必须在转储之前读取所有数据。除了将所有数据存储到内存中然后对其进行分析然后转储所有分析数据之外,您会推荐一种不同的方法吗? 对于 2000 万行的程序,该程序目前需要 40 多秒。 我真的需要缩短那个时间。

  1. 使用 aux=strdup(s); 代替 calloc()、strlen() 和 strcpy()。

  2. 您的 OS(文件系统)通常可以非常有效地缓冲数据流。人们可能会找到一种更有效的缓冲数据流的方法,但这种尝试通常最终只会冗余地缓冲 OS 已经缓冲的内容。您的 OS 很可能提供特定功能,使您能够绕过通常由 OS/filesystem 完成的缓冲。通常,这意味着不使用 "stdio.h" 函数,例如 fscanf() 等

  3. 再次提醒,注意不要对数据进行不必要的双重缓冲。请记住,OS 将缓冲您的数据,并在通常最有效的时候实际将其写出。 (这就是为什么有一个 fflush() 函数......向 OS 建议你将等到它写入所有数据后再继续。)而且,就像通常有特定的 OS函数绕过OS读缓冲区,通常有OS特定函数绕过OS写缓冲区。但是,这些功能很可能超出了您(也许还有这群观众)的需求范围。

我的总结回答(如上所述)是试图超越 OS 以及它缓冲数据流的方式通常会导致代码效率较低。

选择合适的工具

有这么多数据(你说的是来自 csv 文件的 4300 万输入行),硬盘 I/O 将成为瓶颈,并且由于你正在处理对于平面文本文件,每次您需要进行不同的计算(如果您改变主意并希望进行稍微不同的计算 非常非常简单的平均值,然后将所有处理后的数据输出到一个文件), 你每次都需要经历所有这些过程。

更好的策略是使用数据库管理系统,它是存储和处理大量数据的合适工具,可以让您灵活地进行任何需要的处理,使用索引数据,高效处理内存和缓存等,使用简单的 SQL 命令。

如果您不想设置 SQL 服务器(例如 MySQL 或 PostgreSQL),您可以使用不需要的数据库管理系统服务器,例如 SQLite: http://www.sqlite.org/ which you can, in addition, drive from the command-line with the sqlite3 shell program, or from a C program if you wish (SQLite is actually a C library), or by using with a GUI interface, such as http://sqlitestudio.pl/

SQLite 将允许您创建数据库,创建 Table,将 CSV 文件导入其中,进行计算,并以各种格式转储结果,...

使用 SQLite

的演练示例

这是一个入门示例,说明了 sqlite3 shell 程序和 C 代码的使用。

假设您的数据在 data1.csv 中,采用您描述的格式,包含:

1,2,3,variable length string with only alphanumerics and spaces,5
11,22,33,other variable length string with only alphanumerics and spaces,55
111,222,333,yet another variable length string with only alphanumerics and spaces,555

并在 data2.csv 中包含:

2,3,4,from second batch variable length string with only alphanumerics and spaces,6
12,23,34,from second batch other variable length string with only alphanumerics and spaces,56
112,223,334,from second batch yet another variable length string with only alphanumerics and spaces,556

您使用 sqlite3 命令行实用程序创建数据库,使用正确的格式创建 table,导入 CSV 文件,然后发出 SQL 命令,如下所示:

$ sqlite3 bigdatabase.sqlite3
SQLite version 3.8.7.1 2014-10-29 13:59:56
Enter ".help" for usage hints.
sqlite> create table alldata(col1 int, col2 int, col3 int, col4 varchar(255), col5 int);
sqlite> .mode csv
sqlite> .import data1.csv alldata
sqlite> .import data2.csv alldata
sqlite> select * from alldata;
1,2,3,"variable length string with only alphanumerics and spaces",5
11,22,33,"other variable length string with only alphanumerics and spaces",55
111,222,333,"yet another variable length string with only alphanumerics and spaces",555
2,3,4,"from second batch variable length string with only alphanumerics and spaces",6
12,23,34,"from second batch other variable length string with only alphanumerics and spaces",56
112,223,334,"from second batch yet another variable length string with only alphanumerics and spaces",556
sqlite> select avg(col2) from alldata;
82.5
sqlite> 

(按 Ctrl-D 退出 SQL 站点 shell)

上面,我们创建了一个 bigdatabase.sqlite3 文件,其中包含由 SQLite 处理的已创建数据库,一个 table alldata,我们将 CSV 数据导入其中,显示其中包含的数据(不要在 4300 万行上这样做),并且(计算并)显示我们命名为 col2 的列中包含的整数的平均值,该列恰好是第二列。

您可以将创建的 SQLite 数据库与 C 和 SQLite 库一起使用,以实现相同的目的。

创建一个 sqlite-average.c 文件(改编自 SQLite's Quickstart Page 中的示例),如下所示:

#include <stdio.h>
#include <sqlite3.h>

static int callback(void *NotUsed, int argc, char **argv, char **azColName) {
    int i;
    for(i=0; i<argc; i++){
        printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
    }
    printf("\n");
    return 0;
}

int main(void) {
    sqlite3 *db;
    char *zErrMsg = 0;
    int rc;

    rc = sqlite3_open("bigdatabase.sqlite3", &db);                                                      
    if (rc) {
        fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
        sqlite3_close(db);
        return 1;
    }
    rc = sqlite3_exec(db, "select avg(col2) from alldata;", callback, 0, &zErrMsg);
    if (rc!=SQLITE_OK){
        fprintf(stderr, "SQL error: %s\n", zErrMsg);
        sqlite3_free(zErrMsg);
    }
    sqlite3_close(db);

    return 0;
}

编译它,链接到已安装的 SQLite 库,使用 gcc,你可以这样做:

$ gcc -Wall sqlite-average.c -lsqlite3

运行编译后的executable:

$ ./a.out
avg(col2) = 82.5

$

您可能希望为查找数据的列创建索引,例如此 table 中的第 2 列和第 5 列,以便更快地获取信息:

sqlite> create index alldata_idx ON alldata(col2,col5);

决定(如果适用)哪个列将包含 table 等的主键

有关更多信息,请查看:

尝试扫描您的大文件而不将其全部存储到内存中,一次只在局部变量中保留一条记录:

void csvReader(FILE *f) {
    T_structCDT c;
    int count = 0;
    c.string = malloc(1000);
    while (fscanf(f, "%d,%d,%d,%999[^,],%d\n", &c.a, &c.b, &c.vivienda, c.c, &c.d) == 5) {
        // nothing for now
        count++;
    }
    printf("%d records parsed\n");
}

测量这个简单的解析器所花费的时间:

  • 如果足够快,执行选择测试,并在解析阶段找到匹配的几条记录,一次一条输出。这些步骤的额外时间应该相当少,因为只有少数记录匹配。

  • 如果时间太长,你需要一个更花哨的 CSV 解析器,这是很多工作,但可以完成并快速完成,特别是如果你可以假设你的输入文件使用它所有记录的简单格式。这是一个太宽泛的主题,这里无法详细说明,但可达到的速度应该接近 cat csvfile > /dev/nullgrep a_short_string_not_present csvfile

在我的系统上(平均 linux 普通硬盘服务器),从冷启动开始解析 4000 万行总计 2GB 的代码不到 20 秒,第二次不到 4 秒:磁盘I/O 似乎是瓶颈。

如果您需要经常执行此选择,您可能应该使用不同的数据格式,可能是数据库系统。如果偶尔对格式固定的数据执行扫描,使用更快的存储(如 SSD)会有所帮助,但不要指望奇迹。

编辑 为了付诸行动,我写了一个简单的生成器和提取器:

这是一个生成 CSV 数据的简单程序:

#include <stdio.h>
#include <stdlib.h>

const char *dict[] = {
    "Lorem", "ipsum", "dolor", "sit", "amet;", "consectetur", "adipiscing", "elit;",
    "sed", "do", "eiusmod", "tempor", "incididunt", "ut", "labore", "et",
    "dolore", "magna", "aliqua.", "Ut", "enim", "ad", "minim", "veniam;",
    "quis", "nostrud", "exercitation", "ullamco", "laboris", "nisi", "ut", "aliquip",
    "ex", "ea", "commodo", "consequat.", "Duis", "aute", "irure", "dolor",
    "in", "reprehenderit", "in", "voluptate", "velit", "esse", "cillum", "dolore",
    "eu", "fugiat", "nulla", "pariatur.", "Excepteur", "sint", "occaecat", "cupidatat",
    "non", "proident;", "sunt", "in", "culpa", "qui", "officia", "deserunt",
    "mollit", "anim", "id", "est", "laborum.",
};

int csvgen(const char *fmt, long lines) {
    char buf[1024];

    if (*fmt == '[=11=]')
        return 1;

    while (lines > 0) {
        size_t pos = 0;
        int count = 0;
        for (const char *p = fmt; *p && pos < sizeof(buf); p++) {
            switch (*p) {
            case '0': case '1': case '2': case '3': case '4':
            case '5': case '6': case '7': case '8': case '9':
                count = count * 10 + *p - '0';
                continue;
            case 'd':
                if (!count) count = 101;
                pos += snprintf(buf + pos, sizeof(buf) - pos, "%d",
                                rand() % (2 + count - 1) - count + 1);
                count = 0;
                continue;
            case 'u':
                if (!count) count = 101;
                pos += snprintf(buf + pos, sizeof(buf) - pos, "%u",
                                rand() % count);
                count = 0;
                continue;
            case 's':
                if (!count) count = 4;
                count = rand() % count + 1;
                while (count-- > 0 && pos < sizeof(buf)) {
                    pos += snprintf(buf + pos, sizeof(buf) - pos, "%s ",
                                    dict[rand() % (sizeof(dict) / sizeof(*dict))]);
                }
                if (pos < sizeof(buf)) {
                    pos--;
                }
                count = 0;
                continue;
            default:
                buf[pos++] = *p;
                count = 0;
                continue;
            }
        }
        if (pos < sizeof(buf)) {
            buf[pos++] = '\n';
            fwrite(buf, 1, pos, stdout);
            lines--;
        }
    }
    return 0;
}

int main(int argc, char *argv[]) {
    if (argc < 3) {
        fprintf(stderr, "usage: csvgen format number\n");
        return 2;
    }
    return csvgen(argv[1], strtol(argv[2], NULL, 0));
}

这是一个具有 3 种不同解析方法的提取器:

#include <stdio.h>
#include <stdlib.h>

static inline unsigned int getuint(const char *p, const char **pp) {
    unsigned int d, n = 0;
    while ((d = *p - '0') <= 9) {
        n = n * 10 + d;
        p++;
    }
    *pp = p;
    return n;
}

int csvgrep(FILE *f, int method) {
    struct {
        int a, b, c, d;
        int spos, slen;
        char s[1000];
    } c;
    int count = 0, line = 0;

    // select 500 out of 43M
#define select(c)  ((c).a == 100 && (c).b == 100 && (c).c > 74 && (c).d > 50)

    if (method == 0) {
        // default method: fscanf
        while (fscanf(f, "%d,%d,%d,%999[^,],%d\n", &c.a, &c.b, &c.c, c.s, &c.d) == 5) {
            line++;
            if (select(c)) {
                count++;
                printf("%d,%d,%d,%s,%d\n", c.a, c.b, c.c, c.s, c.d);
            }
        }
    } else
    if (method == 1) {
        // use fgets and simple parser
        char buf[1024];
        while (fgets(buf, sizeof(buf), f)) {
            char *p = buf;
            int i;
            line++;
            c.a = strtol(p, &p, 10);
            p += (*p == ',');
            c.b = strtol(p, &p, 10);
            p += (*p == ',');
            c.c = strtol(p, &p, 10);
            p += (*p == ',');
            for (i = 0; *p && *p != ','; p++) {
                c.s[i++] = *p;
            }
            c.s[i] = '[=12=]';
            p += (*p == ',');
            c.d = strtol(p, &p, 10);
            if (*p != '\n') {
                fprintf(stderr, "csvgrep: invalid format at line %d\n", line);
                continue;
            }
            if (select(c)) {
                count++;
                printf("%d,%d,%d,%s,%d\n", c.a, c.b, c.c, c.s, c.d);
            }
        }
    } else
    if (method == 2) {
        // use fgets and hand coded parser, positive numbers only, no string copy
        char buf[1024];
        while (fgets(buf, sizeof(buf), f)) {
            const char *p = buf;
            line++;
            c.a = getuint(p, &p);
            p += (*p == ',');
            c.b = getuint(p, &p);
            p += (*p == ',');
            c.c = getuint(p, &p);
            p += (*p == ',');
            c.spos = p - buf;
            while (*p && *p != ',') p++;
            c.slen = p - buf - c.spos;
            p += (*p == ',');
            c.d = getuint(p, &p);
            if (*p != '\n') {
                fprintf(stderr, "csvgrep: invalid format at line %d\n", line);
                continue;
            }
            if (select(c)) {
                count++;
                printf("%d,%d,%d,%.*s,%d\n", c.a, c.b, c.c, c.slen, buf + c.spos, c.d);
            }
        }
    } else {
        fprintf(stderr, "csvgrep: unknown method: %d\n", method);
        return 1;
    }
    fprintf(stderr, "csvgrep: %d records selected from %d lines\n", count, line);
    return 0;
}

int main(int argc, char *argv[]) {
    if (argc > 2 && strtol(argv[2], NULL, 0)) {
        // non zero second argument -> set a 1M I/O buffer
        setvbuf(stdin, NULL, _IOFBF, 1024 * 1024);
    }
    return csvgrep(stdin, argc > 1 ? strtol(argv[1], NULL, 0) : 0);
}

下面是一些比较基准数据:

$ time ./csvgen "u,u,u,s,u" 43000000 > 43m
real    0m34.428s    user    0m32.911s    sys     0m1.358s
$ time grep zz 43m
real    0m10.338s    user    0m10.069s    sys     0m0.211s
$ time wc -lc 43m
 43000000 1195458701 43m
real    0m1.043s     user    0m0.839s     sys     0m0.196s
$ time cat 43m > /dev/null
real    0m0.201s     user    0m0.004s     sys     0m0.195s
$ time ./csvgrep 0 < 43m > x0
csvgrep: 508 records selected from 43000000 lines
real    0m14.271s    user    0m13.856s    sys     0m0.341s
$ time ./csvgrep 1 < 43m > x1
csvgrep: 508 records selected from 43000000 lines
real    0m8.235s     user    0m7.856s     sys     0m0.331s
$ time ./csvgrep 2 < 43m > x2
csvgrep: 508 records selected from 43000000 lines
real    0m3.892s     user    0m3.555s     sys     0m0.312s
$ time ./csvgrep 2 1 < 43m > x3
csvgrep: 508 records selected from 43000000 lines
real    0m3.706s     user    0m3.488s     sys     0m0.203s
$ cmp x0 x1
$ cmp x0 x2
$ cmp x0 x3

如您所见,专门化解析方法提供了将近 50% 的收益,而手动编码整数转换和字符串扫描又获得了 50%。使用 1 兆字节缓冲区而不是默认大小仅提供 0.2 秒的边际增益。

为了进一步提高速度,您可以使用mmap()绕过I/O流接口并对文件内容进行更强的假设。在上面的代码中,无效格式仍然得到妥善处理,但您可以删除一些测试并以可靠性为代价将执行时间额外减少 5%。

以上基准测试是在具有 SSD 驱动器且文件 43m 适合 RAM 的系统上执行的,因此计时不包括太多磁盘 I/O 延迟。 grep 出奇的慢,增加搜索字符串长度更糟... wc -lc 设置扫描性能目标,4 倍,但 cat 似乎遥不可及。