siddhi 有没有办法存储应用程序会话数据?
is there a way in siddhi to store app session data?
我有输入流 InputStream
从 Kafka 获取输入。然后我使用这个流来调用 HTTP API。在 HTTP 调用响应中,我必须使用 OutputStream
将结果发送到另一个 Kafka 主题
应用流程
InputStream
(有 userId)=> RequestHttpStream
=> ResponseHTTPStream
(没有 userId)=> OutputStream
(这里需要 userId)
问题是,我需要 userId
将事件发送到 OutputStream
,但响应 HTTP 流 ResponseHTTPStream
没有 userId
的概念。所以我在 siddhi 中寻找存储和检索一些 key/values 映射的方法。我在 siddhi 中读到过 tables
,但它似乎在所有事件之间共享。我需要的有点像 app session data
,它只存在于那个会话中。我可以为此使用 partitions
吗?
代码
@source(type = 'kafka',
topic.list = "user-entered-code",
bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","promoCode":{{promoCode}}}""")))
define stream InputStream (userId string, promoCode string);
@sink(type = 'kafka', topic = "user-code-attribution-updated", bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","attributes": {{attributes}}""")))
define stream OutputStream (userId string, attributes object);
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json'))
define stream RequestHttpStream(attributionId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json'))
define stream ResponseHTTPStream (attributes object);
from InputStream
select promoCode as attributionId
insert into RequestHttpStream;
from ResponseHTTPStream
select 'need userId here' as userId, attributes as attributes
insert into OutputStream;
您的用例是在 http-call 接收器和 http-call-响应源之间保留一个属性。这可以通过使用传输属性来实现。由于在source中可以通过引用trp:
访问到sink中的属性
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json',
@payload("""{"attributionId":"{{attributionId}}"""")))
define stream RequestHttpStream(attributionId string, userId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json',
@attributes(attributes='attributes', userId='userId')))
define stream ResponseHTTPStream (attributes object, userId string);
但是请注意,您必须在接收器中使用自定义映射,因为您不想将 userId 发送到 HTTP 端点
我有输入流 InputStream
从 Kafka 获取输入。然后我使用这个流来调用 HTTP API。在 HTTP 调用响应中,我必须使用 OutputStream
应用流程
InputStream
(有 userId)=> RequestHttpStream
=> ResponseHTTPStream
(没有 userId)=> OutputStream
(这里需要 userId)
问题是,我需要 userId
将事件发送到 OutputStream
,但响应 HTTP 流 ResponseHTTPStream
没有 userId
的概念。所以我在 siddhi 中寻找存储和检索一些 key/values 映射的方法。我在 siddhi 中读到过 tables
,但它似乎在所有事件之间共享。我需要的有点像 app session data
,它只存在于那个会话中。我可以为此使用 partitions
吗?
代码
@source(type = 'kafka',
topic.list = "user-entered-code",
bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","promoCode":{{promoCode}}}""")))
define stream InputStream (userId string, promoCode string);
@sink(type = 'kafka', topic = "user-code-attribution-updated", bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","attributes": {{attributes}}""")))
define stream OutputStream (userId string, attributes object);
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json'))
define stream RequestHttpStream(attributionId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json'))
define stream ResponseHTTPStream (attributes object);
from InputStream
select promoCode as attributionId
insert into RequestHttpStream;
from ResponseHTTPStream
select 'need userId here' as userId, attributes as attributes
insert into OutputStream;
您的用例是在 http-call 接收器和 http-call-响应源之间保留一个属性。这可以通过使用传输属性来实现。由于在source中可以通过引用trp:
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json',
@payload("""{"attributionId":"{{attributionId}}"""")))
define stream RequestHttpStream(attributionId string, userId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json',
@attributes(attributes='attributes', userId='userId')))
define stream ResponseHTTPStream (attributes object, userId string);
但是请注意,您必须在接收器中使用自定义映射,因为您不想将 userId 发送到 HTTP 端点