有没有办法通过 IBM Streams 中的操作员(而不是通过 Streams 控制台)捕获 tuple/sec
Is there a way to capure tuple/sec through an operator in IBM Streams (not through Streams console)
我想通过运算符捕获tuples/sec的数量并将其记录在文件中。我不能使用 'Throttle Operator' 自己设置元组率。另外,再补充一下,我说的不是通过控制台获取信息,而是通过SPL应用程序获取信息。
没有直接的 "give me the throughput for this operator" 指标可用。您可以实现一个原始运算符,随着时间的推移访问 nTuplesProcessed
指标并从中计算吞吐量。 (list of available metrics。)但是,我实际上发现使用以下复合运算符更容易:
public composite PeriodicThroughputSink(input In) {
param expression<float64> $period;
expression<rstring> $file;
graph
stream<boolean b> Period = Beacon() {
param period: $period;
}
stream<float64 throughput> Throughput = Custom(In; Period) {
logic state: {
mutable uint64 _count = 0;
float64 _period = $period;
}
onTuple In: {
++_count;
}
onTuple Period: {
if (_count > 0ul) {
submit({throughput=((float64)_count / _period)}, Throughput);
_count = 0ul;
}
}
config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
// writing is on a different thread from the rest
// of the application
}
() as Sink = FileSink(Throughput) {
param file: $file;
format: txt;
flush: 1u;
}
}
然后您可以将复合运算符用作 "throughput tap",它使用来自您要记录其吞吐量的任何运算符的流。例如,您可以这样使用它:
stream<Data> Result = OperatorYouCareAbout(In) {}
() as ResultThroughput = PeriodicThroughputSink(Result) {
param period: 5.0;
file: "ResultThroughput.txt";
}
当然,您仍然可以在应用程序的其他地方使用 Result
流。请记住,此方法可能会对应用程序的性能产生一些影响:我们正在对数据路径进行分接。但是,影响应该不会很大,特别是如果您确保 PeriodicThroughputSink
中的运算符与您正在使用的任何运算符融合到同一个 PE 中。另外,周期越短,越有可能影响应用性能。
同样,我们可以通过访问 nTuplesProcessed
度量在 C++ 或 Java 原始运算符中做类似的事情,但我发现上述方法更容易。您还可以从应用程序外部获取系统指标;比如,你可以有一个定期使用 streamtool capturestate
或 REST API 的脚本,然后解析输出,找到你关心的运算符的 nTuplesProcessed
指标并使用它来计算吞吐量.但是我发现这个复合运算符中的技术更容易。
我想通过运算符捕获tuples/sec的数量并将其记录在文件中。我不能使用 'Throttle Operator' 自己设置元组率。另外,再补充一下,我说的不是通过控制台获取信息,而是通过SPL应用程序获取信息。
没有直接的 "give me the throughput for this operator" 指标可用。您可以实现一个原始运算符,随着时间的推移访问 nTuplesProcessed
指标并从中计算吞吐量。 (list of available metrics。)但是,我实际上发现使用以下复合运算符更容易:
public composite PeriodicThroughputSink(input In) {
param expression<float64> $period;
expression<rstring> $file;
graph
stream<boolean b> Period = Beacon() {
param period: $period;
}
stream<float64 throughput> Throughput = Custom(In; Period) {
logic state: {
mutable uint64 _count = 0;
float64 _period = $period;
}
onTuple In: {
++_count;
}
onTuple Period: {
if (_count > 0ul) {
submit({throughput=((float64)_count / _period)}, Throughput);
_count = 0ul;
}
}
config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
// writing is on a different thread from the rest
// of the application
}
() as Sink = FileSink(Throughput) {
param file: $file;
format: txt;
flush: 1u;
}
}
然后您可以将复合运算符用作 "throughput tap",它使用来自您要记录其吞吐量的任何运算符的流。例如,您可以这样使用它:
stream<Data> Result = OperatorYouCareAbout(In) {}
() as ResultThroughput = PeriodicThroughputSink(Result) {
param period: 5.0;
file: "ResultThroughput.txt";
}
当然,您仍然可以在应用程序的其他地方使用 Result
流。请记住,此方法可能会对应用程序的性能产生一些影响:我们正在对数据路径进行分接。但是,影响应该不会很大,特别是如果您确保 PeriodicThroughputSink
中的运算符与您正在使用的任何运算符融合到同一个 PE 中。另外,周期越短,越有可能影响应用性能。
同样,我们可以通过访问 nTuplesProcessed
度量在 C++ 或 Java 原始运算符中做类似的事情,但我发现上述方法更容易。您还可以从应用程序外部获取系统指标;比如,你可以有一个定期使用 streamtool capturestate
或 REST API 的脚本,然后解析输出,找到你关心的运算符的 nTuplesProcessed
指标并使用它来计算吞吐量.但是我发现这个复合运算符中的技术更容易。