Siddhi 流式传输无模式数据

Siddhi stream schemaless data

我有一个来自 ActiveMQ 的源数据,我遇到的问题是这些数据没有固定的结构,因此,当我定义流时它会抛出一个不兼容的数据类型错误,有什么方法可以调节某些条件下的源流?

提前致谢。

/*
* Origin of data.
*/
@source(type='jms',
        @map(type='csv', delimiter=',', fail.on.unknown.attribute='false'),
        factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory',
        provider.url='tcp://127.0.0.1:61616',
        destination='simulatedData',
        connection.factory.type='queue',
        connection.factory.jndi.name='QueueConnectionFactory',
        transport.jms.SubscriptionDurable='true',
        transport.jms.DurableSubscriberClientID='wso2SPclient1')


define stream FileSourceProductionStream(type string, time long, studentId string, fileId string, totalAccesses float); /* totalAccesses : float Incompatible DataType*/
define stream TaskSourceProductionStream(type string, time long, studentId string, taskId string, deadline long); /*deadline: long Incompatible DataType*/

在 siddhi 中,源流应该具有输入数据的模式。因此,我们无法将无模式数据接收到已定义的流。

针对您的场景的一种可能解决方案是定义一个具有所有可能输入属性的流,并将输入预格式化为 JSON[1]、XML[2] 或 TEXT[3 ] siddhi-map 扩展支持的格式,具有属性名称和值。对于缺少的属性,json/xml/text 输入负载中不会有键或标签。

然后在source config中使用@map(fail.on.missing.attributes='false)配置

那么对于输入payload中所有缺失的属性,都会将NULL赋值给输入流的对应属性。

[1] https://wso2-extensions.github.io/siddhi-map-json/api/4.0.20/

[2]https://wso2-extensions.github.io/siddhi-map-xml/api/4.0.12/

[3]https://wso2-extensions.github.io/siddhi-map-text/api/1.0.16/