如何使用接收器类型为 "http-response" 的另一个流的响应更新流

How to update a stream with the response from another stream where the sink type is "http-response"

我正在尝试使用通过 "http-response" 响应接收器填充的附加属性来丰富我的输入流。

我尝试使用带有 window 属性和 "every" 关键字的 "join" 来合并两个流,并将生成的合并流插入另一个流以丰富它。

window 属性(window.time(1 秒)或 window.length(1))和 "every" 关键字在传入事件以常规方式出现时效果很好间隔 1 秒或更长时间。

当(例如 10 或 100)事件同时(在一秒内)发送时。那么合并的结果不是预期的。

具有"window"属性的(加入)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

带有"every"关键字的那个

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

是否有更好的方法合并两个流以更新第三个流?

要获取原始请求属性,可以使用自定义映射,如下所示,

@source(type='http-call-response', sink.id='source-1'
       @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
define stream responseStream(name String, id int, headers String, volume long, price float);

在这里,可以使用 trp:attributeName 访问请求属性,在此示例中只有名称来自响应,价格和数量来自请求。

您的 'every' 关键字方法中的语法不太正确。你有没有试过这样的事情:

from every (e1 = event1) -> e2=event2[e1.variable == e2.variable]
select e1.variable1, e2.variable1, e2.variable2
insert into outputEvent;

This document 可能会有帮助。