将统计信息从 wso2 EI 发布到 wso2 流处理器

Publish statistic from wso2 EI to wso2 Stream Processor

我需要知道如何通过事件发布者将统计信息从 Enteprise Integrator 发布到流处理器。

我在我的 EI 上实施了以下事件发布者

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="MessageFlowStatisticsPublisher"
  statistics="enable" trace="enable" xmlns="http://wso2.org/carbon/eventpublisher">
  <from streamName="org.wso2.esb.analytics.stream.FlowEntry" version="1.0.0"/>
  <mapping customMapping="disable" type="wso2event"/>
  <to eventAdapterType="wso2event">
    <property name="username">admin</property>
    <property name="protocol">thrift</property>
    <property name="publishingMode">non-blocking</property>
    <property name="publishTimeout">0</property>
    <property name="receiverURL">tcp://xxx:7611</property>
    <property encrypted="true" name="password">xxx</property>
  </to>
</eventPublisher>

在流处理器上,我有一个简单的 siddhi 应用程序用于接收数据并将它们打印到日志中,如下所示

@App:name("FlowEntryApp")
@App:description("Plan of flow entry")

@source(type='wso2event', @map(type = 'wso2event'))
define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);

@sink(type='log', prefix='My flowEntry:')
define stream TestOutputFlowEntry(messageId string, flowData string);

@info(name='FlowEntryOutput')
from FlowEntry
select messageId, flowData
group by messageId
insert into TestOutputFlowEntry;

我还为我的代理服务将所有用于发布统计信息的配置设置为 "enable statistic" 和 "enable trace"。当我调用我的服务时,eventPublisher 将 wso2event 发送到 SP,这工作正常。但是在SP这边,SP处理错误"No StreamDefinition for streamId org.wso2.esb.analytics.stream.FlowEntry:1.0.0 present in cache"

我知道,问题出在 siddhi 应用程序中,我定义流 "FlowEntry" 而不是 "org.wso2.esb.analytics.stream.FlowEntry",但 siddhi 语言不支持像“.”这样的字符。在流名称中。

所以我尝试更改 EI 站点上的流名称,仅将 eventPublisher 中的 streamName 更改为 'FlowEntry',我还更改了 eventstream 文件夹内 json 文件中的 streamName 但现在当我调用我的服务时, EI 不会向 SP 发送任何事件。

有人知道如何将 org.wso2.esb.analytics.stream.FlowEntry 流发布到 SP,然后由 siddhi 处理吗?

可以使用源注释中的 wso2.stream.id 元素覆盖流名称。

@source(type='wso2event', wso2.stream.id='org.wso2.esb.analytics.stream.FlowEntry', @map(type = 'wso2event'))<br>
define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);

通过使用上面的源定义,'FlowEntry' 仍然可以在 Siddhi 应用程序中使用,而在 thrift 服务器流 id 将被定义为 'org.wso2.esb.analytics.stream.FlowEntry:1.0.0'。

此致。