如何按时间而不是按大小分块 shell 脚本输入?
How to chunk shell script input by time, not by size?
在 bash 脚本中,我使用了多生产者单消费者模式。生产者是将行写入 fifo 的后台进程(通过 GNU Parallel)。消费者从fifo中读取所有行,然后排序、过滤,并将格式化后的结果打印到stdout。
但是,可能需要很长时间才能获得完整结果。生产商通常在前几个结果上很快,但随后会放慢速度。在这里,我更感兴趣的是每隔几秒查看一次数据块,每个数据块都单独排序和过滤。
mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
process "$chunk"
done
循环将 运行 直到所有生产者完成并读取所有输入。读取每个块,直到 5 秒内没有新数据,或者直到块开始后 10 秒。如果 10s 没有新数据,一个 chunk 也可能是空的。
我试着让它像这样工作:
output=$(mktemp)
while true; do
wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
while true; do
IFS= read -r -t5 <>"${fifo}"
rc="$?"
if [[ "${rc}" -gt 0 ]]; then
[[ "${rc}" -gt 128 ]] && wasTimeout=1
break
fi
echo "$REPLY" >>"${output}"
if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
wasTimeout=1
break
fi
done
echo '---' >>"${output}"
[[ "${wasTimeout}" -eq 0 ]] && break
done
尝试了一些变体。在上面的表格中,它读取第一个块,然后永远循环。如果我使用 <"${fifo}"
(没有上面的 read/write),它会在第一个块之后阻塞。也许所有这些都可以用 buffer
and/or stdbuf
来简化?但是它们都是按大小而不是按时间来定义块的。
我会考虑用队列编写一个安全的 multi-threaded 程序。
我更了解 Java,但可能会有更现代的合适语言,例如 Go 和 Kotlin。
像这样:
#!/usr/bin/perl
$timeout = 3;
while(<STDIN>) {
# Make sure there is some input
push @out,$_;
eval {
local $SIG{ALRM} = sub { die };
alarm $timeout;
while(<STDIN>) {
alarm $timeout;
push @out,$_;
}
alarm 0;
};
system "echo","process",@out;
}
这不是一个需要解决的小问题。正如我所暗示的,C 程序(或使用 shell 以外的某种编程语言编写的程序)可能是最佳解决方案。一些复杂的因素是:
- 读取超时。
- 如果数据到达的足够快,超时会改变。
- 不同的系统有不同的间隔计时功能:
alarm()
可能随处可用,但只有 1 秒的分辨率,容易出现累积的舍入误差。 (使用 make UFLAGS=-DUSE_ALARM
编译此版本;在 macOS 上,使用 make UFLAGS=-DUSE_ALARM LDLIB2=
。)
setitimer()
使用微秒计时和 struct timeval
类型。 (使用 make UFLAGS=-DUSE_SETITIMER
编译此版本;在 macOS 上,使用 make UFLAGS=-DUSE_SETITIMER LDLIB2=
编译。)
timer_create()
和
timer_settime()
等使用现代纳秒类型 struct timespec
。这在 Linux 上可用;它不适用于 macOS 10.14.5 Mojave 或更早版本。 (使用 make
编译此版本;它不适用于 macOS。)
程序使用信息为:
$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
-c chunk Maximum time to wait for data in a chunk (default 10)
-d delay Maximum delay after line read (default: 5)
-f file Read from file instead of standard input
-h Print this help message and exit
-v Verbose mode: print timing information to stderr
-V Print version information and exit
$
此代码可在我的 SOQ (Stack Overflow Questions) repository on GitHub as file chunker79.c
in the src/so-5631-4784 sub-directory 中找到。您还需要 src/libsoq 目录中的一些支持代码。
/*
@(#)File: chunker79.c
@(#)Purpose: Chunk Reader for SO 5631-4784
@(#)Author: J Leffler
@(#)Copyright: (C) JLSS 2019
*/
/*TABSTOP=4*/
/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel). The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available. Producers are usually fast on the first few results but
** then would slow down. Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
** mkfifo fifo
** parallel ... >"$fifo" &
** while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
** process "$chunk"
** done
**
** The loop would run until all producers are done and all input is
** read. Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started. A chunk may also
** be empty if there was no new data for 10s.
*/
/*
** Analysis
**
** 1. If no data arrives at all for 10 seconds, then the program should
** terminate producing no output. This timeout is controlled by the
** value of time_chunk in the code.
** 2. If data arrives more or less consistently, then the collection
** should continue for 10s and then finish. This timeout is also
** controlled by the value of time_chunk in the code.
** 3. If a line of data arrives before 5 seconds have elapsed, and no
** more arrives for 5 seconds, then the collection should finish.
** (If the first line arrives after 5 seconds and no more arrives
** for more than 5 seconds, then the 10 second timeout cuts in.)
** This timeout is controlled by the value of time_delay in the code.
** 4. This means that we want two separate timers at work:
** - Chunk timer (started when the program starts).
** - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored. External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
** but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
** timer_gettime(2) uses current POSIX function but is not available
** on macOS.
*/
#include "posixver.h"
#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>
#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */
static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
" -c chunk Maximum time to wait for data in a chunk (default 10)\n"
" -d delay Maximum delay after line read (default: 5)\n"
" -f file Read from file instead of standard input\n"
" -h Print this help message and exit\n"
" -v Verbose mode: print timing information to stderr\n"
" -V Print version information and exit\n"
;
static struct timespec time_delay = { .tv_sec = 5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;
static bool verbose = false;
static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);
// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);
#if defined(USE_ALARM)
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(time_chunk.tv_sec);
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
unsigned time_left = alarm(0);
if (time_left > time_delay.tv_sec)
alarm(time_delay.tv_sec);
else
alarm(time_left);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(0);
signal(SIGALRM, SIG_IGN);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#elif defined(USE_SETITIMER)
static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_until;
if (getitimer(ITIMER_REAL, &tv_until) != 0)
err_syserr("failed to set interval timer: ");
struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);
if (verbose)
{
char buff1[32];
fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
}
if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_delay);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new =
{
.it_value = { .tv_sec = 0, .tv_usec = 0 },
.it_interval = { .tv_sec = 0, .tv_usec = 0 },
};
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#else /* USE_TIMER_GETTIME */
#include "timespec_math.h"
static timer_t t0 = { 0 };
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct sigevent ev =
{
.sigev_notify = SIGEV_SIGNAL,
.sigev_signo = SIGALRM,
.sigev_value.sival_int = 0,
.sigev_notify_function = 0,
.sigev_notify_attributes = 0,
};
if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
err_syserr("failed to create a timer: ");
struct itimerspec it =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_chunk,
};
struct itimerspec ot;
if (timer_settime(t0, 0, &it, &ot) != 0)
err_syserr("failed to activate timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerspec time_until;
if (timer_gettime(t0, &time_until) != 0)
err_syserr("failed to set per-process timer: ");
char buff1[32];
fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
if (cmp_timespec(time_until.it_value, time_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerspec time_new =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_delay,
};
struct itimerspec time_old;
if (timer_settime(t0, 0, &time_new, &time_old) != 0)
err_syserr("failed to set per-process timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (timer_delete(t0) != 0)
err_syserr("failed to delete timer: ");
}
#endif /* Timing mode */
/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
assert(signum == SIGALRM);
if (verbose)
err_remark("---- %s(): signal %d\n", __func__, signum);
}
static void read_chunks(FILE *fp)
{
size_t num_data = 0;
size_t max_data = 0;
struct iovec *data = 0;
size_t buflen = 0;
char *buffer = 0;
ssize_t length;
size_t chunk_len = 0;
clock_gettime(CLOCK_REALTIME, &time_start);
set_chunk_timeout();
while ((length = getline(&buffer, &buflen, fp)) != -1)
{
if (num_data >= max_data)
{
size_t new_size = (num_data * 2) + 2;
void *newspace = realloc(data, new_size * sizeof(data[0]));
if (newspace == 0)
err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
data = newspace;
max_data = new_size;
}
data[num_data].iov_base = buffer;
data[num_data].iov_len = length;
num_data++;
if (verbose)
err_remark("Received line %zu\n", num_data);
chunk_len += length;
buffer = 0;
buflen = 0;
set_delay_timeout();
}
cancel_timeout();
if (chunk_len > 0)
{
if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
else if ((size_t)length != chunk_len)
err_error("failed to write %zu bytes to standard output "
"(short write of %zu bytes)\n", chunk_len, (size_t)length);
}
if (verbose)
err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);
for (size_t i = 0; i < num_data; i++)
free(data[i].iov_base);
free(data);
free(buffer);
}
int main(int argc, char **argv)
{
const char *name = "(standard input)";
FILE *fp = stdin;
err_setarg0(argv[0]);
err_setlogopts(ERR_MICRO);
int opt;
while ((opt = getopt(argc, argv, optstr)) != -1)
{
switch (opt)
{
case 'c':
if (scn_timespec(optarg, &time_chunk) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'd':
if (scn_timespec(optarg, &time_delay) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'f':
if ((fp = fopen(optarg, "r")) == 0)
err_syserr("Failed to open file '%s' for reading: ", optarg);
name = optarg;
break;
case 'h':
err_help(usestr, hlpstr);
/*NOTREACHED*/
case 'v':
verbose = true;
break;
case 'V':
err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
/*NOTREACHED*/
default:
err_usage(usestr);
/*NOTREACHED*/
}
}
if (optind != argc)
err_usage(usestr);
if (verbose)
{
err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
err_remark("file: %s\n", name);
}
read_chunks(fp);
return 0;
}
我的 SOQ 存储库也有一个脚本 gen-data.sh
,它利用一些自定义程序生成这样的数据流(种子值写入标准错误,而不是标准输出):
$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$
当使用默认选项输入 chunker79
时,我得到如下输出:
$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$
如果您分析时间间隔(查看输出行中的前两个字段),则输出符合规范。更详细的分析如下:
$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000] - 5.026s - chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00] - 7.838s - gen-data.sh
$
在 chunker79
的输出出现和 gen-data.sh
完成之间,此设置中有一个明显的停顿。这是因为 Bash 等待管道中的所有进程完成,并且 gen-data.sh
直到下一次它在消息完成 chunker79
之后写入管道时才完成。这是此测试设置的产物;它不会成为问题中概述的 shell 脚本的一个因素。
引入 GNU Parallel 20200122 --blocktimeout
(--bt
):
find ~ | parallel -j3 --bt 2s --pipe wc
这与普通的 GNU Parallel 一样工作,除非它需要 > 2 秒来填充一个块。在那种情况下,到目前为止读取的块只是传递给 wc
(除非它是空的)。
它有一个稍微奇怪的启动行为:在输出稳定之前你必须等待 3*2s (jobslots*timeout),并且你至少每 2s 得到一个输出。
在 bash 脚本中,我使用了多生产者单消费者模式。生产者是将行写入 fifo 的后台进程(通过 GNU Parallel)。消费者从fifo中读取所有行,然后排序、过滤,并将格式化后的结果打印到stdout。
但是,可能需要很长时间才能获得完整结果。生产商通常在前几个结果上很快,但随后会放慢速度。在这里,我更感兴趣的是每隔几秒查看一次数据块,每个数据块都单独排序和过滤。
mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
process "$chunk"
done
循环将 运行 直到所有生产者完成并读取所有输入。读取每个块,直到 5 秒内没有新数据,或者直到块开始后 10 秒。如果 10s 没有新数据,一个 chunk 也可能是空的。
我试着让它像这样工作:
output=$(mktemp)
while true; do
wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
while true; do
IFS= read -r -t5 <>"${fifo}"
rc="$?"
if [[ "${rc}" -gt 0 ]]; then
[[ "${rc}" -gt 128 ]] && wasTimeout=1
break
fi
echo "$REPLY" >>"${output}"
if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
wasTimeout=1
break
fi
done
echo '---' >>"${output}"
[[ "${wasTimeout}" -eq 0 ]] && break
done
尝试了一些变体。在上面的表格中,它读取第一个块,然后永远循环。如果我使用 <"${fifo}"
(没有上面的 read/write),它会在第一个块之后阻塞。也许所有这些都可以用 buffer
and/or stdbuf
来简化?但是它们都是按大小而不是按时间来定义块的。
我会考虑用队列编写一个安全的 multi-threaded 程序。
我更了解 Java,但可能会有更现代的合适语言,例如 Go 和 Kotlin。
像这样:
#!/usr/bin/perl
$timeout = 3;
while(<STDIN>) {
# Make sure there is some input
push @out,$_;
eval {
local $SIG{ALRM} = sub { die };
alarm $timeout;
while(<STDIN>) {
alarm $timeout;
push @out,$_;
}
alarm 0;
};
system "echo","process",@out;
}
这不是一个需要解决的小问题。正如我所暗示的,C 程序(或使用 shell 以外的某种编程语言编写的程序)可能是最佳解决方案。一些复杂的因素是:
- 读取超时。
- 如果数据到达的足够快,超时会改变。
- 不同的系统有不同的间隔计时功能:
alarm()
可能随处可用,但只有 1 秒的分辨率,容易出现累积的舍入误差。 (使用make UFLAGS=-DUSE_ALARM
编译此版本;在 macOS 上,使用make UFLAGS=-DUSE_ALARM LDLIB2=
。)setitimer()
使用微秒计时和struct timeval
类型。 (使用make UFLAGS=-DUSE_SETITIMER
编译此版本;在 macOS 上,使用make UFLAGS=-DUSE_SETITIMER LDLIB2=
编译。)timer_create()
和timer_settime()
等使用现代纳秒类型struct timespec
。这在 Linux 上可用;它不适用于 macOS 10.14.5 Mojave 或更早版本。 (使用make
编译此版本;它不适用于 macOS。)
程序使用信息为:
$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
-c chunk Maximum time to wait for data in a chunk (default 10)
-d delay Maximum delay after line read (default: 5)
-f file Read from file instead of standard input
-h Print this help message and exit
-v Verbose mode: print timing information to stderr
-V Print version information and exit
$
此代码可在我的 SOQ (Stack Overflow Questions) repository on GitHub as file chunker79.c
in the src/so-5631-4784 sub-directory 中找到。您还需要 src/libsoq 目录中的一些支持代码。
/*
@(#)File: chunker79.c
@(#)Purpose: Chunk Reader for SO 5631-4784
@(#)Author: J Leffler
@(#)Copyright: (C) JLSS 2019
*/
/*TABSTOP=4*/
/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel). The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available. Producers are usually fast on the first few results but
** then would slow down. Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
** mkfifo fifo
** parallel ... >"$fifo" &
** while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
** process "$chunk"
** done
**
** The loop would run until all producers are done and all input is
** read. Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started. A chunk may also
** be empty if there was no new data for 10s.
*/
/*
** Analysis
**
** 1. If no data arrives at all for 10 seconds, then the program should
** terminate producing no output. This timeout is controlled by the
** value of time_chunk in the code.
** 2. If data arrives more or less consistently, then the collection
** should continue for 10s and then finish. This timeout is also
** controlled by the value of time_chunk in the code.
** 3. If a line of data arrives before 5 seconds have elapsed, and no
** more arrives for 5 seconds, then the collection should finish.
** (If the first line arrives after 5 seconds and no more arrives
** for more than 5 seconds, then the 10 second timeout cuts in.)
** This timeout is controlled by the value of time_delay in the code.
** 4. This means that we want two separate timers at work:
** - Chunk timer (started when the program starts).
** - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored. External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
** but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
** timer_gettime(2) uses current POSIX function but is not available
** on macOS.
*/
#include "posixver.h"
#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>
#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */
static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
" -c chunk Maximum time to wait for data in a chunk (default 10)\n"
" -d delay Maximum delay after line read (default: 5)\n"
" -f file Read from file instead of standard input\n"
" -h Print this help message and exit\n"
" -v Verbose mode: print timing information to stderr\n"
" -V Print version information and exit\n"
;
static struct timespec time_delay = { .tv_sec = 5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;
static bool verbose = false;
static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);
// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);
#if defined(USE_ALARM)
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(time_chunk.tv_sec);
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
unsigned time_left = alarm(0);
if (time_left > time_delay.tv_sec)
alarm(time_delay.tv_sec);
else
alarm(time_left);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(0);
signal(SIGALRM, SIG_IGN);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#elif defined(USE_SETITIMER)
static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_until;
if (getitimer(ITIMER_REAL, &tv_until) != 0)
err_syserr("failed to set interval timer: ");
struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);
if (verbose)
{
char buff1[32];
fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
}
if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_delay);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new =
{
.it_value = { .tv_sec = 0, .tv_usec = 0 },
.it_interval = { .tv_sec = 0, .tv_usec = 0 },
};
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#else /* USE_TIMER_GETTIME */
#include "timespec_math.h"
static timer_t t0 = { 0 };
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct sigevent ev =
{
.sigev_notify = SIGEV_SIGNAL,
.sigev_signo = SIGALRM,
.sigev_value.sival_int = 0,
.sigev_notify_function = 0,
.sigev_notify_attributes = 0,
};
if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
err_syserr("failed to create a timer: ");
struct itimerspec it =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_chunk,
};
struct itimerspec ot;
if (timer_settime(t0, 0, &it, &ot) != 0)
err_syserr("failed to activate timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerspec time_until;
if (timer_gettime(t0, &time_until) != 0)
err_syserr("failed to set per-process timer: ");
char buff1[32];
fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
if (cmp_timespec(time_until.it_value, time_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerspec time_new =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_delay,
};
struct itimerspec time_old;
if (timer_settime(t0, 0, &time_new, &time_old) != 0)
err_syserr("failed to set per-process timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (timer_delete(t0) != 0)
err_syserr("failed to delete timer: ");
}
#endif /* Timing mode */
/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
assert(signum == SIGALRM);
if (verbose)
err_remark("---- %s(): signal %d\n", __func__, signum);
}
static void read_chunks(FILE *fp)
{
size_t num_data = 0;
size_t max_data = 0;
struct iovec *data = 0;
size_t buflen = 0;
char *buffer = 0;
ssize_t length;
size_t chunk_len = 0;
clock_gettime(CLOCK_REALTIME, &time_start);
set_chunk_timeout();
while ((length = getline(&buffer, &buflen, fp)) != -1)
{
if (num_data >= max_data)
{
size_t new_size = (num_data * 2) + 2;
void *newspace = realloc(data, new_size * sizeof(data[0]));
if (newspace == 0)
err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
data = newspace;
max_data = new_size;
}
data[num_data].iov_base = buffer;
data[num_data].iov_len = length;
num_data++;
if (verbose)
err_remark("Received line %zu\n", num_data);
chunk_len += length;
buffer = 0;
buflen = 0;
set_delay_timeout();
}
cancel_timeout();
if (chunk_len > 0)
{
if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
else if ((size_t)length != chunk_len)
err_error("failed to write %zu bytes to standard output "
"(short write of %zu bytes)\n", chunk_len, (size_t)length);
}
if (verbose)
err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);
for (size_t i = 0; i < num_data; i++)
free(data[i].iov_base);
free(data);
free(buffer);
}
int main(int argc, char **argv)
{
const char *name = "(standard input)";
FILE *fp = stdin;
err_setarg0(argv[0]);
err_setlogopts(ERR_MICRO);
int opt;
while ((opt = getopt(argc, argv, optstr)) != -1)
{
switch (opt)
{
case 'c':
if (scn_timespec(optarg, &time_chunk) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'd':
if (scn_timespec(optarg, &time_delay) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'f':
if ((fp = fopen(optarg, "r")) == 0)
err_syserr("Failed to open file '%s' for reading: ", optarg);
name = optarg;
break;
case 'h':
err_help(usestr, hlpstr);
/*NOTREACHED*/
case 'v':
verbose = true;
break;
case 'V':
err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
/*NOTREACHED*/
default:
err_usage(usestr);
/*NOTREACHED*/
}
}
if (optind != argc)
err_usage(usestr);
if (verbose)
{
err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
err_remark("file: %s\n", name);
}
read_chunks(fp);
return 0;
}
我的 SOQ 存储库也有一个脚本 gen-data.sh
,它利用一些自定义程序生成这样的数据流(种子值写入标准错误,而不是标准输出):
$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$
当使用默认选项输入 chunker79
时,我得到如下输出:
$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$
如果您分析时间间隔(查看输出行中的前两个字段),则输出符合规范。更详细的分析如下:
$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000] - 5.026s - chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00] - 7.838s - gen-data.sh
$
在 chunker79
的输出出现和 gen-data.sh
完成之间,此设置中有一个明显的停顿。这是因为 Bash 等待管道中的所有进程完成,并且 gen-data.sh
直到下一次它在消息完成 chunker79
之后写入管道时才完成。这是此测试设置的产物;它不会成为问题中概述的 shell 脚本的一个因素。
引入 GNU Parallel 20200122 --blocktimeout
(--bt
):
find ~ | parallel -j3 --bt 2s --pipe wc
这与普通的 GNU Parallel 一样工作,除非它需要 > 2 秒来填充一个块。在那种情况下,到目前为止读取的块只是传递给 wc
(除非它是空的)。
它有一个稍微奇怪的启动行为:在输出稳定之前你必须等待 3*2s (jobslots*timeout),并且你至少每 2s 得到一个输出。