如何按时间而不是按大小分块 shell 脚本输入?

How to chunk shell script input by time, not by size?

在 bash 脚本中,我使用了多生产者单消费者模式。生产者是将行写入 fifo 的后台进程(通过 GNU Parallel)。消费者从fifo中读取所有行,然后排序、过滤,并将格式化后的结果打印到stdout。

但是,可能需要很长时间才能获得完整结果。生产商通常在前几个结果上很快,但随后会放慢速度。在这里,我更感兴趣的是每隔几秒查看一次数据块,每个数据块都单独排序和过滤。

mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
  process "$chunk"
done

循环将 运行 直到所有生产者完成并读取所有输入。读取每个块,直到 5 秒内没有新数据,或者直到块开始后 10 秒。如果 10s 没有新数据,一个 chunk 也可能是空的。

我试着让它像这样工作:

output=$(mktemp)
while true; do
  wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
  while true; do
    IFS= read -r -t5 <>"${fifo}"
    rc="$?"
    if [[ "${rc}" -gt 0 ]]; then
      [[ "${rc}" -gt 128 ]] && wasTimeout=1
      break
    fi
    echo "$REPLY" >>"${output}"
    if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
      wasTimeout=1
      break
    fi
  done
  echo '---' >>"${output}"
  [[ "${wasTimeout}" -eq 0 ]] && break
done

尝试了一些变体。在上面的表格中,它读取第一个块,然后永远循环。如果我使用 <"${fifo}"(没有上面的 read/write),它会在第一个块之后阻塞。也许所有这些都可以用 buffer and/or stdbuf 来简化?但是它们都是按大小而不是按时间来定义块的。

我会考虑用队列编写一个安全的 multi-threaded 程序。

我更了解 Java,但可能会有更现代的合适语言,例如 Go 和 Kotlin。

像这样:

#!/usr/bin/perl

$timeout = 3;
while(<STDIN>) {
    # Make sure there is some input                                                      
    push @out,$_;
    eval {
        local $SIG{ALRM} = sub { die };
        alarm $timeout;
        while(<STDIN>) {
            alarm $timeout;
            push @out,$_;
        }
        alarm 0;
    };
    system "echo","process",@out;
}

这不是一个需要解决的小问题。正如我所暗示的,C 程序(或使用 shell 以外的某种编程语言编写的程序)可能是最佳解决方案。一些复杂的因素是:

  • 读取超时。
  • 如果数据到达的足够快,超时会改变。
  • 不同的系统有不同的间隔计时功能:
    • alarm() 可能随处可用,但只有 1 秒的分辨率,容易出现累积的舍入误差。 (使用 make UFLAGS=-DUSE_ALARM 编译此版本;在 macOS 上,使用 make UFLAGS=-DUSE_ALARM LDLIB2=。)
    • setitimer() 使用微秒计时和 struct timeval 类型。 (使用 make UFLAGS=-DUSE_SETITIMER 编译此版本;在 macOS 上,使用 make UFLAGS=-DUSE_SETITIMER LDLIB2= 编译。)
    • timer_create()timer_settime() 等使用现代纳秒类型 struct timespec。这在 Linux 上可用;它不适用于 macOS 10.14.5 Mojave 或更早版本。 (使用 make 编译此版本;它不适用于 macOS。)

程序使用信息为:

$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
  -c chunk  Maximum time to wait for data in a chunk (default 10)
  -d delay  Maximum delay after line read (default: 5)
  -f file   Read from file instead of standard input
  -h        Print this help message and exit
  -v        Verbose mode: print timing information to stderr
  -V        Print version information and exit

$

此代码可在我的 SOQ (Stack Overflow Questions) repository on GitHub as file chunker79.c in the src/so-5631-4784 sub-directory 中找到。您还需要 src/libsoq 目录中的一些支持代码。

/*
@(#)File:           chunker79.c
@(#)Purpose:        Chunk Reader for SO 5631-4784
@(#)Author:         J Leffler
@(#)Copyright:      (C) JLSS 2019
*/

/*TABSTOP=4*/

/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel).  The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available.  Producers are usually fast on the first few results but
** then would slow down.  Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
**    mkfifo fifo
**    parallel ... >"$fifo" &
**    while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
**      process "$chunk"
**    done
**
** The loop would run until all producers are done and all input is
** read.  Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started.  A chunk may also
** be empty if there was no new data for 10s.
*/

/*
** Analysis
**
** 1.  If no data arrives at all for 10 seconds, then the program should
**     terminate producing no output.  This timeout is controlled by the
**     value of time_chunk in the code.
** 2.  If data arrives more or less consistently, then the collection
**     should continue for 10s and then finish.  This timeout is also
**     controlled by the value of time_chunk in the code.
** 3.  If a line of data arrives before 5 seconds have elapsed, and no
**     more arrives for 5 seconds, then the collection should finish.
**     (If the first line arrives after 5 seconds and no more arrives
**     for more than 5 seconds, then the 10 second timeout cuts in.)
**     This timeout is controlled by the value of time_delay in the code.
** 4.  This means that we want two separate timers at work:
**     - Chunk timer (started when the program starts).
**     - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored.  External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
**    but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
**    timer_gettime(2) uses current POSIX function but is not available
**    on macOS.
*/

#include "posixver.h"

#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>

#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */

static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
    "  -c chunk  Maximum time to wait for data in a chunk (default 10)\n"
    "  -d delay  Maximum delay after line read (default: 5)\n"
    "  -f file   Read from file instead of standard input\n"
    "  -h        Print this help message and exit\n"
    "  -v        Verbose mode: print timing information to stderr\n"
    "  -V        Print version information and exit\n"
    ;

static struct timespec time_delay = { .tv_sec =  5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;

static bool verbose = false;

static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);

// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);

#if defined(USE_ALARM)

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(time_chunk.tv_sec);
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    unsigned time_left = alarm(0);
    if (time_left > time_delay.tv_sec)
        alarm(time_delay.tv_sec);
    else
        alarm(time_left);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(0);
    signal(SIGALRM, SIG_IGN);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#elif defined(USE_SETITIMER)

static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
    return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
    tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_until;
    if (getitimer(ITIMER_REAL, &tv_until) != 0)
        err_syserr("failed to set interval timer: ");
    struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);

    if (verbose)
    {
        char buff1[32];
        fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
        char buff2[32];
        fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
        err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
    }

    if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
        tv_new.it_value = cvt_timespec_to_timeval(time_delay);
        struct itimerval tv_old;
        if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
            err_syserr("failed to set interval timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new =
    {
        .it_value    = { .tv_sec = 0, .tv_usec = 0 },
        .it_interval = { .tv_sec = 0, .tv_usec = 0 },
    };
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#else /* USE_TIMER_GETTIME */

#include "timespec_math.h"

static timer_t t0 = { 0 };

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);

    struct sigevent ev =
    {
        .sigev_notify = SIGEV_SIGNAL,
        .sigev_signo = SIGALRM,
        .sigev_value.sival_int = 0,
        .sigev_notify_function = 0,
        .sigev_notify_attributes = 0,
    };
    if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
        err_syserr("failed to create a timer: ");

    struct itimerspec it =
    {
        .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
        .it_value = time_chunk,
    };
    struct itimerspec ot;
    if (timer_settime(t0, 0, &it, &ot) != 0)
        err_syserr("failed to activate timer: ");

    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerspec time_until;
    if (timer_gettime(t0, &time_until) != 0)
        err_syserr("failed to set per-process timer: ");

    char buff1[32];
    fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
    char buff2[32];
    fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
    err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);

    if (cmp_timespec(time_until.it_value, time_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerspec time_new =
        {
            .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
            .it_value = time_delay,
        };
        struct itimerspec time_old;
        if (timer_settime(t0, 0, &time_new, &time_old) != 0)
            err_syserr("failed to set per-process timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (timer_delete(t0) != 0)
        err_syserr("failed to delete timer: ");
}

#endif /* Timing mode */

/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
    assert(signum == SIGALRM);
    if (verbose)
        err_remark("---- %s(): signal %d\n", __func__, signum);
}

static void read_chunks(FILE *fp)
{
    size_t num_data = 0;
    size_t max_data = 0;
    struct iovec *data = 0;
    size_t buflen = 0;
    char *buffer = 0;
    ssize_t length;
    size_t chunk_len = 0;

    clock_gettime(CLOCK_REALTIME, &time_start);

    set_chunk_timeout();
    while ((length = getline(&buffer, &buflen, fp)) != -1)
    {
        if (num_data >= max_data)
        {
            size_t new_size = (num_data * 2) + 2;
            void *newspace = realloc(data, new_size * sizeof(data[0]));
            if (newspace == 0)
                err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
            data = newspace;
            max_data = new_size;
        }
        data[num_data].iov_base = buffer;
        data[num_data].iov_len = length;
        num_data++;
        if (verbose)
            err_remark("Received line %zu\n", num_data);
        chunk_len += length;
        buffer = 0;
        buflen = 0;
        set_delay_timeout();
    }
    cancel_timeout();

    if (chunk_len > 0)
    {
        if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
            err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
        else if ((size_t)length != chunk_len)
            err_error("failed to write %zu bytes to standard output "
                      "(short write of %zu bytes)\n", chunk_len, (size_t)length);
    }

    if (verbose)
        err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);

    for (size_t i = 0; i < num_data; i++)
        free(data[i].iov_base);
    free(data);
    free(buffer);
}

int main(int argc, char **argv)
{
    const char *name = "(standard input)";
    FILE *fp = stdin;
    err_setarg0(argv[0]);
    err_setlogopts(ERR_MICRO);

    int opt;
    while ((opt = getopt(argc, argv, optstr)) != -1)
    {
        switch (opt)
        {
        case 'c':
            if (scn_timespec(optarg, &time_chunk) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'd':
            if (scn_timespec(optarg, &time_delay) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'f':
            if ((fp = fopen(optarg, "r")) == 0)
                err_syserr("Failed to open file '%s' for reading: ", optarg);
            name = optarg;
            break;
        case 'h':
            err_help(usestr, hlpstr);
            /*NOTREACHED*/
        case 'v':
            verbose = true;
            break;
        case 'V':
            err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
            /*NOTREACHED*/
        default:
            err_usage(usestr);
            /*NOTREACHED*/
        }
    }

    if (optind != argc)
        err_usage(usestr);

    if (verbose)
    {
        err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
        err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
        err_remark("file:  %s\n", name);
    }

    read_chunks(fp);

    return 0;
}

我的 SOQ 存储库也有一个脚本 gen-data.sh,它利用一些自定义程序生成这样的数据流(种子值写入标准错误,而不是标准输出):

$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$

当使用默认选项输入 chunker79 时,我得到如下输出:

$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$

如果您分析时间间隔(查看输出行中的前两个字段),则输出符合规范。更详细的分析如下:

$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000]  -  5.026s  -  chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00]  -  7.838s  -  gen-data.sh
$

chunker79 的输出出现和 gen-data.sh 完成之间,此设置中有一个明显的停顿。这是因为 Bash 等待管道中的所有进程完成,并且 gen-data.sh 直到下一次它在消息完成 chunker79 之后写入管道时才完成。这是此测试设置的产物;它不会成为问题中概述的 shell 脚本的一个因素。

引入 GNU Parallel 20200122 --blocktimeout (--bt):

find ~ | parallel -j3 --bt 2s --pipe wc

这与普通的 GNU Parallel 一样工作,除非它需要 > 2 秒来填充一个块。在那种情况下,到目前为止读取的块只是传递给 wc(除非它是空的)。

它有一个稍微奇怪的启动行为:在输出稳定之前你必须等待 3*2s (jobslots*timeout),并且你至少每 2s 得到一个输出。