是否可以对从 Siddhi 查询中的 JSON 输入中提取的字段进行分组?

Is it possible to Group By a field extracted from JSON input in a Siddhi Query?

我目前有一个流,其中包含 1 个包含 Json 事件的字符串属性。

这个流接收不同的事件,我想应用 Json 路径表达式,这样我就可以在过滤器和函数上使用这些属性。

Json路径提取器在过滤器和选择器上的作用就像一个魅力,不幸的是,我无法将它们用于 'Group By' 部分。 我实际上是在一个嵌入式 Siddhi 应用程序中进行的,手动添加了 siddhi-execution-json 扩展名,但为了讨论,所以每个人都可以 轻松检查和测试它,我将粘贴一个适用于 WSO2 流处理器的示例应用程序。 objective 看起来像下面的应用程序:

@App:name("Group_by_json_attribute")

define stream JsonStream(json string);

@sink(type='log')
define stream LogStream(myField string, count long);

@info(name='query1')
from JsonStream#window.time(10 sec)
select json:getString(json, '$.myField') as myField, count() as count 
group by myField having count > 1 
insert into LogStream;

并且它可以接受以下事件:

{"myField": "my_value"}

但是,此查询将引发错误:

Cannot find attribute type as 'myField' does not exist in 'JsonStream'; define stream JsonStream(json string)

我也试过在 'Group by':

直接使用 Json 提取器

group by json:getString(json, '$.myField') as myField having count > 1

但是现在的错误是:

mismatched input ':' expecting {',', ORDER, LIMIT, OFFSET, HAVING, INSERT, DELETE, UPDATE, RETURN, OUTPUT}

似乎不​​希望在这里使用扩展

我只是想知道,是否可以按输入流中未直接定义的属性进行分组。在这种情况下是从 JSON 对象中提取的字段,但它可以是生成另一个属性的任何其他函数。

我也在使用来自 maven 中央存储库的版本

(Edit) Clarification

objective是,使用不直接在Stream中定义的属性,要在Group By上使用。

原因是,我目前有一个嵌入式应用程序,它定义了来自外部源的整套输入流,格式为 JSON,并且还有一组输出流来通知外部组件查询匹配。 此应用程序允许用户在这组预定义的流上创建自定义查询,但他们无法自己创建流。

非常感谢!

我们似乎期望来自查询输入流的字段分组,在本例中为 JsonStream。在此之前使用另一个查询进行提取,并在以下查询中进行聚合和过滤,

@App:name("Group_by_json_attribute")

define stream JsonStream(json string);

@sink(type='log')
define stream LogStream(myField string, count long);

@info(name='extract_stream')
from JsonStream
select json:getString(json, '$.myField') as myField 
insert into ExtractedStream;

@info(name='query1')
from ExtractedStream#window.time(10 sec)
select myField, count() as count 
group by myField 
having count > 1 
insert into LogStream;