在 Siddhi 中获取 NATS 流媒体事件序列号
Getting the NATS streaming event sequence number in Siddhi
我有一个文本格式的推文流 (TwitterStream) 和每个推文的情绪流 (SentimentStream)。 SentimentStream 订阅 TwitterStream,进行情绪分析并发布带有结果和 TwitterStream 序列号的新消息。
我正在尝试加入这两个流,其中 SentimentStream.seq 等于推文的序列号。我遇到的问题是我无法从 TwitterStream 获得序列号的 "handle"。
我一直在努力寻找一种方法来获取事件 "metadata",这可能会提供一些关于事件的位置/序列号的见解。
@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')
@sink(type = 'log',
@map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);
@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);
@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);
-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;
任何帮助将不胜感激。
我们已通过 https://github.com/siddhi-io/siddhi-io-nats/issues/25 创建功能改进请求以提供此支持。
我有一个文本格式的推文流 (TwitterStream) 和每个推文的情绪流 (SentimentStream)。 SentimentStream 订阅 TwitterStream,进行情绪分析并发布带有结果和 TwitterStream 序列号的新消息。
我正在尝试加入这两个流,其中 SentimentStream.seq 等于推文的序列号。我遇到的问题是我无法从 TwitterStream 获得序列号的 "handle"。
我一直在努力寻找一种方法来获取事件 "metadata",这可能会提供一些关于事件的位置/序列号的见解。
@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')
@sink(type = 'log',
@map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);
@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);
@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);
-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;
任何帮助将不胜感激。
我们已通过 https://github.com/siddhi-io/siddhi-io-nats/issues/25 创建功能改进请求以提供此支持。