需要测试以确定异步 I/O 是否实际发生在 C 代码中

Need test to determine if asynchronous I/O is actually happening in C code

我是异步新手 I/O。我需要让它在 Linux 系统上的一些 C 和 Fort运行 程序中工作。我设法编写了一些从两个文件异步读取的 C 测试代码(包含在下面)。代码编译和运行。不过,我想知道的是,我是否真正获得了异步 I/O,或者 I/O 是否真的是串行的?我正在处理的lustre文件系统有点过时,并不清楚它是否真正支持异步I/O,而且似乎没有人有明确的答案。所以我想知道是否有一些计时语句或任何类型的输出我可以添加到代码中以确定它是否以真正的异步方式运行。我打赌我需要比我正在处理的文件大得多的文件来进行有意义的测试。不知道我还需要什么。

密码是:

#include <stdio.h>
#include <stdlib.h>
/*  for "open()" ... */
#include <fcntl.h>
/* for "bzero()" ... */
#include<strings.h>
/* for asynch I/O ... */
#include <aio.h>
/* for EINPROGRESS ... */
#include <errno.h>
/* for "usleep()" ... */
#include <unistd.h>

#define BUFSIZE 1024


int main() {

        int fd0, fd1, readstat0, readstat1;
        struct aiocb *ioobjs[2];

        ioobjs[0] = malloc(sizeof(struct aiocb));
        ioobjs[1] = malloc(sizeof(struct aiocb));

        fd0 = open("file.txt", O_RDONLY);
        if (fd0 < 0) perror("open");
        fd1 = open("otherfile.txt", O_RDONLY);
        if (fd1 < 0) perror("open");

        bzero((char *)ioobjs[0], sizeof(struct aiocb));
        bzero((char *)ioobjs[1], sizeof(struct aiocb));

        ioobjs[0]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[0]->aio_buf) perror("malloc 0");
        ioobjs[1]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[1]->aio_buf) perror("malloc 0");

        ioobjs[0]->aio_fildes = fd0;
        ioobjs[0]->aio_nbytes = BUFSIZE;
        ioobjs[0]->aio_offset = 0;
        /* Don't forget this!  With list I/O, there is no
         * particular function call to make.  You have to
         * tell what you want to do via this member of
         * your aiocb struct:
         */
        ioobjs[0]->aio_lio_opcode = LIO_READ;
        ioobjs[1]->aio_fildes = fd1;
        ioobjs[1]->aio_nbytes = BUFSIZE;
        ioobjs[1]->aio_offset = 0;
        ioobjs[1]->aio_lio_opcode = LIO_READ;

        readstat0 = aio_read(ioobjs[0]);
        if (readstat0 < 0) perror("reading 0");
        readstat1 = aio_read(ioobjs[1]);
        if (readstat1 < 0) perror("reading 1");

        lio_listio(LIO_NOWAIT, ioobjs, 2, NULL);


        /* don't completely understand.  gives system time to
         * "wrap things up".  without this, one of the outputs
         * below (maybe both) will have no output to give.
         */
        usleep(100);

        if ((readstat0 = aio_return( ioobjs[0] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[0]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }
        if ((readstat1 = aio_return( ioobjs[1] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[1]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }


}

来自 man aio,请注意 aio_* 完全是一个 glibc [用户空间] 实现。

因此,如前所述,它有一些限制。

按时间查看正在发生的事情的方法是使用带有时间戳的事件日志。

天真的方法是只使用 [debug] printf 调用。但是,对于精确时间测量,printf 的开销可能会扰乱 real/actual 计时。也就是说,我们不测量“被测系统”,而是“被测系统 + timing/benchmark 开销”。

一种方法是 运行 您的程序在 strace 下使用适当的时间戳选项。 strace 日志 包含有关使用的 系统调用 的信息。但是,因为 aio 是在用户空间中实现的,它 可能 无法深入到足够细的粒度。而且,strace 本身会产生开销。

另一种方法是创建 trace/event 日志机制并检测您的代码。基本上,它实现了一个固定长度的“跟踪元素”环形队列。所以,trace数据是存在内存中的,速度非常快。

可以帮助解决此问题的标准实用程序是 dtrace。我自己没有这样做,因为我更喜欢“自己动手”。请参阅下面我使用过的一些实际代码。

然后,使用(例如)检测您的代码:

evtadd(TRACE_HELLO,"hello world");

其中 TRACE_* 来自您根据需要定义的 enum


无论如何,这是我过去使用的一些事件跟踪代码。可以扩展事件结构以添加您希望在事件点存储的任何额外数据。

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <sys/syscall.h>
#include <pthread.h>

typedef long long tsc_t;
typedef unsigned long long u32;
typedef unsigned long long u64;

// trace buffer element
typedef struct {
    tsc_t evt_tsc;
    pid_t evt_tid;
    const char *evt_name;
    u64 evt_xid;
} evtelem_t;

// trace buffer indexes
typedef struct {
    u32 ring_enq;
    u32 ring_deq;
} evtring_t;

// trace buffer control
typedef struct {
    evtring_t que_ring;
    u32 que_max;
    evtelem_t *que_base;
    pthread_mutex_t que_lock;
} evtctl_t;

evtctl_t evtctl;

// advance queue index [wrap if necessary]
#define EVTINC(_qidx) \
    do { \
        _qidx += 1; \
        if (_qidx >= evtctl.que_max) \
            _qidx = 0; \
    } while (0)

// tscget -- get timestamp
tsc_t
tscget(void)
{
    struct timespec ts;
    static tsc_t tsczero = 0;
    tsc_t tsc;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    tsc = ts.tv_sec;
    tsc *= 1000000000;
    tsc += ts.tv_nsec;

    if (tsczero == 0)
        tsczero = tsc;

    tsc -= tsczero;

    return tsc;
}

// tscsec -- convert timestamp to fractional seconds
double
tscsec(tsc_t tsc)
{
    double sec;

    sec = tsc;
    sec /= 1e9;

    return sec;
}

// evtptr -- point to trace element
evtelem_t *
evtptr(u32 idx)
{

    return &evtctl.que_base[idx];
}

// evtinit -- initialize trace
void
evtinit(u32 qmax)
{

    evtctl.que_base = calloc(qmax,sizeof(evtelem_t));
    evtctl.que_max = qmax;

    evtctl.que_ring.ring_deq = 0;
    evtctl.que_ring.ring_enq = 0;

    pthread_mutex_init(&evtctl.que_lock,NULL);
}

// evtnew -- locate new event slot
evtelem_t *
evtnew(void)
{
    evtring_t *nring;
    evtelem_t *evt;

    pthread_mutex_lock(&evtctl.que_lock);

    nring = &evtctl.que_ring;

    evt = evtptr(nring->ring_enq);

    // advance enqueue pointer [wrap if necessary]
    EVTINC(nring->ring_enq);

    // if queue full advance dequeue pointer to maintain space
    if (nring->ring_enq == nring->ring_deq)
        EVTINC(nring->ring_deq);

    pthread_mutex_unlock(&evtctl.que_lock);

    return evt;
}

// evtadd -- add trace element
evtelem_t *
evtadd(u64 xid,const char *name)
{
    tsc_t tsc;
    evtelem_t *evt;

    tsc = tscget();

    evt = evtnew();
    evt->evt_tsc = tsc;
    evt->evt_xid = xid;
    evt->evt_name = name;
    evt->evt_tid = syscall(SYS_gettid);

    return evt;
}

// _evtdump -- dump queue element
void
_evtdump(evtelem_t *evt,tsc_t *tscptr,FILE *xfinfo)
{
    tsc_t tscprev;
    tsc_t tscnow;
    double elap;
    double delta;

    // get timestamp for this entry
    tscnow = evt->evt_tsc;

    tscprev = *tscptr;
    if (tscprev == 0)
        tscprev = tscnow;

    // get time delta from previous entry
    tscprev = tscnow - tscprev;
    delta = tscsec(tscprev);

    // get elapsed time from start
    elap = tscsec(tscnow);

    fprintf(xfinfo,"%.9f/%.9f %8d %s [%llu]\n",
        elap,delta,evt->evt_tid,evt->evt_name,evt->evt_xid);

    *tscptr = evt->evt_tsc;
}

// evtdump -- dump the queue
void
evtdump(const char *file)
{
    FILE *evtxf;
    evtring_t ring;
    evtelem_t *evt;
    tsc_t tscprev;

    evtxf = fopen(file,"w");
    if (evtxf == NULL) {
        perror(file);
        exit(1);
    }

    ring = evtctl.que_ring;

    // initialize previous timestamp
    tscprev = 0;

    while (ring.ring_enq != ring.ring_deq) {
        evt = evtptr(ring.ring_deq);
        EVTINC(ring.ring_deq);
        _evtdump(evt,&tscprev,evtxf);
    }

    fclose(evtxf);
}