现有流上的聚合计数器

Aggregate-counter on an existing stream

我正在尝试为我设置的各种流创建一个聚合计数器。在 SpringXD 中,它看起来像这样:"tap:stream:MyCustomStream > aggregate-counter".

到目前为止,我在 Spring Cloud Dataflow 中完成了“:MyKafkaTopic > aggregate-counter”,这似乎创建了一个 Kafka 消费者并读取有效负载以确定主题上的事件计数。我希望能够利用任何流,而不仅仅是 Kafka 源,例如"MyApp1 | MyApp2" --name MyCustomStream.

提供的示例 "stream create --definition ":mainstream.http > counter" --name tap_at_http --deploy" 本质上假设 mainstream.http 是 Kafka 主题(或 RabbitMQ 主题) .

有人做过吗?

以你为例,

stream create foo --definition "MyApp1 | MyApp2"

如果您必须在生产者 MyApp1 级别 TAP foo 流,则您的 TAP 流需要以下内容。

stream create bar --definition ":foo.MyApp1 > MyApp3"

您只是指向流中的制作者,您希望在其中点击以获取相同数据的副本。格式为::<streamName>.<label/appName>。您也可以使用 "labels" 代替应用名称。请查看 reference guide 了解更多详情。

The provided example "stream create --definition ":mainstream.http > counter" --name tap_at_http --deploy" essentially assumes mainstream.http is a Kafka topic (or RabbitMQ topic).

在这种情况下,mainstream 是流名称,您在 http 源应用程序中 TAP,等同于 :mainstream.http

这类似于 Spring XD 中的 tap:stream:foo。默认情况下,如果仅在流中存在,Spring XD 会假定生产者。但是,当您在处理器上进行 TAP 时,您必须指定它。

在 SCDF 中,我们特别要求它使其更具描述性,并且 DSL 也易于遵循。