siddhi 有没有办法存储应用程序会话数据?

is there a way in siddhi to store app session data?

我有输入流 InputStream 从 Kafka 获取输入。然后我使用这个流来调用 HTTP API。在 HTTP 调用响应中,我必须使用 OutputStream

将结果发送到另一个 Kafka 主题

应用流程
InputStream(有 userId)=> RequestHttpStream => ResponseHTTPStream(没有 userId)=> OutputStream(这里需要 userId)

问题是,我需要 userId 将事件发送到 OutputStream,但响应 HTTP 流 ResponseHTTPStream 没有 userId 的概念。所以我在 siddhi 中寻找存储和检索一些 key/values 映射的方法。我在 siddhi 中读到过 tables,但它似乎在所有事件之间共享。我需要的有点像 app session data ,它只存在于那个会话中。我可以为此使用 partitions 吗?

代码


@source(type = 'kafka',
    topic.list = "user-entered-code", 
    bootstrap.servers = "kafka:9092", 
    @map(type = 'json', validate.json = "true", 
        @payload("""{"userId":"{{userId}}","promoCode":{{promoCode}}}""")))
define stream InputStream (userId string, promoCode string);

@sink(type = 'kafka', topic = "user-code-attribution-updated", bootstrap.servers = "kafka:9092", 
    @map(type = 'json', validate.json = "true", 
        @payload("""{"userId":"{{userId}}","attributes": {{attributes}}""")))
define stream OutputStream (userId string, attributes object);

@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request",  publisher.url = "http://test-node-server:3031/search-attribution-id", 
    @map(type = 'json'))
define stream RequestHttpStream(attributionId string);

@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200", 
    @map(type = 'json'))
define stream ResponseHTTPStream (attributes object);

from InputStream
select promoCode as attributionId
insert into RequestHttpStream;

from ResponseHTTPStream
select 'need userId here' as userId, attributes as attributes
insert into OutputStream;

您的用例是在 http-call 接收器和 http-call-响应源之间保留一个属性。这可以通过使用传输属性来实现。由于在source中可以通过引用trp:

访问到sink中的属性
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request",  publisher.url = "http://test-node-server:3031/search-attribution-id", 
    @map(type = 'json', 
        @payload("""{"attributionId":"{{attributionId}}"""")))
define stream RequestHttpStream(attributionId string, userId string);

@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200", 
    @map(type = 'json', 
       @attributes(attributes='attributes', userId='userId')))
define stream ResponseHTTPStream (attributes object, userId string);

但是请注意,您必须在接收器中使用自定义映射,因为您不想将 userId 发送到 HTTP 端点