当 运行 `kcat` 时,`exec.Command()` 不产生任何输出
`exec.Command()` produces no output when running `kcat`
我需要将 kcat
包装在 Go 函数中以读取一系列主题消息,因此想到使用 exec.Command()
如下:
package main
import (
"fmt"
"os/exec"
)
func main() {
cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
fmt.Println("Command String:", cmd.String())
out, err := cmd.CombinedOutput()
if err != nil {
fmt.Println("Error Accessing kafka topic messages ", err.Error(), string(out))
return
}
fmt.Println("Result Length:", len(out))
fmt.Println("Result Content:", string(out))
}
但是,这个returns只是kcat
输出的第一行:
/app/tools # ./five
Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
(注意:我 运行 将其放在 docker 容器中,但我认为这没有什么不同)
但是,当 运行 直接从 CLI:
/app/tools #
/app/tools # kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 10 -c 15
% Auto-selecting Consumer mode (use -P or -C to override)
%4|1640957136.462|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
%4|1640957136.483|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
[{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9179","Time":"9179","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"8740","Time":"8740","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9318","Time":"9318","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1743","Time":"1743","Data":"new order", "Eventname":"newOrder"}]
kcat
命令似乎有一些独特之处,它在 Go 中中断了 exec.Command()
。
问题:
- 有没有其他方法可以在 Go 中实现相同的效果?
- 这可能是我使用方式的问题吗
exec.Command()
理想情况下,我可以在这种情况下使用 kcat
命令,因为我想避免在这种情况下使用 segmentios kafka-go
库。
[编辑]
- 分隔参数(如@onecricketeer 所建议):
cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
结果(同样的错误):
/app/tools # ./code
Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
- 使用 BASH 作为 shell(由 maxm 建议):
相同的结果,即仅报告 kcat 输出的第一行:
/app/tools # ./code
Command String: /bin/bash -c kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
[编辑]
NOTE: 然而,当我使用Python的shell执行机制时,它工作正常,这让我想知道Gos shell处理功能是否存在缺陷:
import subprocess
process = subprocess.Popen(["kcat","-b","kafka.kafka.svc.cluster.local:9092","-t","messages","-o","1", "-c", "11"],
stdout=subprocess.PIPE,
universal_newlines=True)
while True:
output = process.stdout.readline()
print(output.strip())
# Do something else
return_code = process.poll()
if return_code is not None:
print('RETURN CODE', return_code)
# Process has finished, read rest of the output
for output in process.stdout.readlines():
print(output.strip())
break
结果:
/app/tools/python # python3 code.py
% Auto-selecting Consumer mode (use -P or -C to override)
%4|1641004616.232|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
%4|1641004616.236|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
[{"Name":"newOrder", "ID":"4512","Time":"4512","Data":"new order", "Eventname":"newOrder"}]
RETURN CODE 0
[{"Name":"newOrder", "ID":"2388","Time":"2388","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"8707","Time":"8707","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1643","Time":"1643","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"2421","Time":"2421","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"7520","Time":"7520","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1258","Time":"1258","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1457","Time":"1457","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"2907","Time":"2907","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
go 命令:
cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
与shell命令相同:
kcat "-b kafka.kafka.svc.cluster.local:9092" "-t messages" "-o 11000" "-c 11333"
你需要分开你的参数,就像 shell 默认情况下在每个 space 上为你做的一样:
cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
如输出所示,正在自动选择生产者模式
尝试使用带有分隔参数的消费者模式
cmd := exec.Command("kcat", "-C", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
我需要将 kcat
包装在 Go 函数中以读取一系列主题消息,因此想到使用 exec.Command()
如下:
package main
import (
"fmt"
"os/exec"
)
func main() {
cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
fmt.Println("Command String:", cmd.String())
out, err := cmd.CombinedOutput()
if err != nil {
fmt.Println("Error Accessing kafka topic messages ", err.Error(), string(out))
return
}
fmt.Println("Result Length:", len(out))
fmt.Println("Result Content:", string(out))
}
但是,这个returns只是kcat
输出的第一行:
/app/tools # ./five
Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
(注意:我 运行 将其放在 docker 容器中,但我认为这没有什么不同)
但是,当 运行 直接从 CLI:
/app/tools #
/app/tools # kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 10 -c 15
% Auto-selecting Consumer mode (use -P or -C to override)
%4|1640957136.462|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
%4|1640957136.483|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 10) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
[{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9179","Time":"9179","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"8740","Time":"8740","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9318","Time":"9318","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1743","Time":"1743","Data":"new order", "Eventname":"newOrder"}]
kcat
命令似乎有一些独特之处,它在 Go 中中断了 exec.Command()
。
问题:
- 有没有其他方法可以在 Go 中实现相同的效果?
- 这可能是我使用方式的问题吗
exec.Command()
理想情况下,我可以在这种情况下使用 kcat
命令,因为我想避免在这种情况下使用 segmentios kafka-go
库。
[编辑]
- 分隔参数(如@onecricketeer 所建议):
cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
结果(同样的错误):
/app/tools # ./code
Command String: /usr/bin/kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
- 使用 BASH 作为 shell(由 maxm 建议):
相同的结果,即仅报告 kcat 输出的第一行:
/app/tools # ./code
Command String: /bin/bash -c kcat -b kafka.kafka.svc.cluster.local:9092 -t messages -o 11000 -c 11333
Result Length: 58
Result Content: % Auto-selecting Producer mode (use -P or -C to override)
[编辑]
NOTE: 然而,当我使用Python的shell执行机制时,它工作正常,这让我想知道Gos shell处理功能是否存在缺陷:
import subprocess
process = subprocess.Popen(["kcat","-b","kafka.kafka.svc.cluster.local:9092","-t","messages","-o","1", "-c", "11"],
stdout=subprocess.PIPE,
universal_newlines=True)
while True:
output = process.stdout.readline()
print(output.strip())
# Do something else
return_code = process.poll()
if return_code is not None:
print('RETURN CODE', return_code)
# Process has finished, read rest of the output
for output in process.stdout.readlines():
print(output.strip())
break
结果:
/app/tools/python # python3 code.py
% Auto-selecting Consumer mode (use -P or -C to override)
%4|1641004616.232|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [2]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
%4|1641004616.236|OFFSET|rdkafka#consumer-1| [thrd:main]: messages [1]: offset reset (at offset 1) to END: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
[{"Name":"newOrder", "ID":"4512","Time":"4512","Data":"new order", "Eventname":"newOrder"}]
RETURN CODE 0
[{"Name":"newOrder", "ID":"2388","Time":"2388","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"8707","Time":"8707","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1643","Time":"1643","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"2421","Time":"2421","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"7520","Time":"7520","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1258","Time":"1258","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1457","Time":"1457","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"2907","Time":"2907","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"9266","Time":"9266","Data":"new order", "Eventname":"newOrder"}]
[{"Name":"newOrder", "ID":"1547","Time":"1547","Data":"new order", "Eventname":"newOrder"}]
go 命令:
cmd := exec.Command("kcat", "-b kafka.kafka.svc.cluster.local:9092", "-t messages", "-o 11000", "-c 11333")
与shell命令相同:
kcat "-b kafka.kafka.svc.cluster.local:9092" "-t messages" "-o 11000" "-c 11333"
你需要分开你的参数,就像 shell 默认情况下在每个 space 上为你做的一样:
cmd := exec.Command("kcat", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")
如输出所示,正在自动选择生产者模式
尝试使用带有分隔参数的消费者模式
cmd := exec.Command("kcat", "-C", "-b", "kafka.kafka.svc.cluster.local:9092", "-t", "messages", "-o", "11000", "-c", "11333")