如何使用 grep 实用程序处理连续流输出?

How to process continuous stream output with grep utility?

我有一个要求,我的 curl 命令从流式 HTTP 服务接收连续输出。流永无止境。我只想从流中 grep 一个字符串,然后 pass/pipe 此命令输出到另一个实用程序(例如 xargs ),例如 echo,以进行进一步的连续处理。

这是连续流的输出,只有当我结束 运行 curl 命令时我才会停止接收它。

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N 

[{"header":{"queryId":"none","schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `SENSOR_ID` STRING, `TEMP` BIGINT, `HUM` BIGINT"}},
{"row":{"columns":[1599624891102,"S2","S2",40,20]}},
{"row":{"columns":[1599624891113,"S1","S1",90,80]}},
{"row":{"columns":[1599624909117,"S2","S2",40,20]}},
{"row":{"columns":[1599624909125,"S1","S1",90,80]}},
{"row":{"columns":[1599625090320,"S2","S2",40,20]}},

现在,当我将输出通过管道传递给 grep 时,它按预期工作并且我不断收到任何新事件。

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1

{"row":{"columns":[1599624891113,"S1","S1",90,80]}},
{"row":{"columns":[1599624909125,"S1","S1",90,80]}},

但是当我将此 grep 输出通过管道传输到 xargs 和 echo 时,输出根本就没有移动。

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1 | xargs -I {} echo {} 
^C

当我从中间删除 grep 时,它按预期工作。

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | xargs -I {} echo {} 

[{header:{queryId:none,schema:`ROWTIME` BIGINT, `ROWKEY` STRING, `SENSOR_ID` STRING, `TEMP` BIGINT, `HUM` BIGINT}},
{row:{columns:[1599624891102,S2,S2,40,20]}},
{row:{columns:[1599624891113,S1,S1,90,80]}},
{row:{columns:[1599624909117,S2,S2,40,20]}},
{row:{columns:[1599624909125,S1,S1,90,80]}},
{row:{columns:[1599625090320,S2,S2,40,20]}},

看起来 grep 正在寻找要结束的输入,然后才能进一步通过管道传输。当我用有限的输入测试同样的东西时,它按预期工作。

ls | grep sh | xargs -I {} echo {};

abcd.sh
123.sh
pqr.sh
xyz.sh

所以,问题是:我的理解是否正确?有没有办法让 grep 可以将输出实时传递给后续命令?我想在进一步的脚本中保留一些基本的过滤逻辑,因此希望 grep 工作。

提前致谢!

阿努拉格

正如@larsks 所建议的,grep 的“--line-buffered flush output on every line”选项在测试与您类似的要求时工作正常。

所以命令是

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1 --line-buffered | xargs -I {} echo {}

我测试了“/var/log/messages”文件,该文件不断更新如下:

[root@project1-master ~]# tail -f /var/log/messages | grep journal --line-buffered | xargs -I {} echo {}

9 月 11 日 11:15:47 project1-master 日志:I0911 15:15:47.448254 1 node_lifecycle_controller.go:1429] 正在初始化区域的驱逐指标:

9 月 11 日 11:15:52 project1-master 日志:I0911 15:15:52.448704 1 node_lifecycle_controller.go:1429] 正在初始化区域的驱逐指标:

9 月 11 日 11:15:54 project1-master journal: 2020-09-11 15:15:54.006 [INFO][46] felix/int_dataplane.go 1300: 应用数据平面更新