siddhi - 无法使用 siddhi 从 rabbitmq 检索事件消息
siddhi - Can't retrieve event message from rabbitmq with siddhi
public static void main(String[] args) {
String siddhiApp = "@App:name('TestExecutionPlan') "
+ "define stream FooStream (teste string); "
+ "@info(name = 'query1') "
+ "@source(type ='rabbitmq', "
+ "uri = 'amqp://test:test@192.168.99.100:5672', "
+ "exchange.name = 'amq.topic', "
+ "exchange.type = 'topic', "
+ "routing.key= '#', "
+ "queue.name = 'siddhi-queue', "
+ "@map(type='text')) "
+ "Define stream BarStream (test string); "
+ "from FooStream select test insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.start();
siddhiAppRuntime.addCallback("FooStream", new StreamCallback() {
public void receive(Event[] event) {
EventPrinter.print(event);
}
});
}
此代码无法从 rabbitmq 检索事件消息。
我可以看到进入 rabbitmq 仪表板的连接和通道,所有发布到 exchange 的消息都被传递到其他绑定队列。
抱歉各位,我发现了问题。
log4j.properties 未配置。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%m%n
您需要回调 "BarStream",这是来源。您可以尝试以下示例。
@Test
public void rabbitmqSourceTest() throws InterruptedException {
AtomicInteger eventCount = new AtomicInteger(0);
String siddhiApp = "@App:name('TestExecutionPlan') "
+ "define stream FooStream (test string); "
+ "@info(name = 'query1') "
+ "@source(type ='rabbitmq', "
+ "uri = 'amqp://guest:guest@172.17.0.2:5672', "
+ "exchange.name = 'amq.topic', "
+ "exchange.type = 'topic', "
+ "routing.key= '#',"
+ "queue.name = 'siddhi-queue', "
+ "@map(type='text')) "
+ "define stream BarStream (test string); ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
EventPrinter.print(event);
eventCount.incrementAndGet();
}
}
});
siddhiAppRuntime.start();
SiddhiAppRuntime executionPlanRuntime = siddhiManager.createSiddhiAppRuntime(
"@App:name('TestExecutionPlan') " +
"define stream FooStream (test string); " +
"@info(name = 'query1') " +
"@sink(type ='rabbitmq', uri = 'amqp://guest:guest@172.17.0.2:5672', " +
"exchange.type='topic', " +
"exchange.name = 'amq.topic', " +
"@map(type='text'))" +
"Define stream BarStream (test string);" +
"from FooStream select test insert into BarStream;");
InputHandler fooStream = executionPlanRuntime.getInputHandler("FooStream");
executionPlanRuntime.start();
List<Event> arrayList = new ArrayList<Event>();
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"IBM"}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
fooStream.send(arrayList.toArray(new Event[3]));
SiddhiTestHelper.waitForEvents(waitTime, 3, eventCount, timeout);
executionPlanRuntime.shutdown();
siddhiAppRuntime.shutdown();
}
参考 Siddhi 查询指南:Siddhi-source
public static void main(String[] args) {
String siddhiApp = "@App:name('TestExecutionPlan') "
+ "define stream FooStream (teste string); "
+ "@info(name = 'query1') "
+ "@source(type ='rabbitmq', "
+ "uri = 'amqp://test:test@192.168.99.100:5672', "
+ "exchange.name = 'amq.topic', "
+ "exchange.type = 'topic', "
+ "routing.key= '#', "
+ "queue.name = 'siddhi-queue', "
+ "@map(type='text')) "
+ "Define stream BarStream (test string); "
+ "from FooStream select test insert into BarStream; ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.start();
siddhiAppRuntime.addCallback("FooStream", new StreamCallback() {
public void receive(Event[] event) {
EventPrinter.print(event);
}
});
}
此代码无法从 rabbitmq 检索事件消息。
我可以看到进入 rabbitmq 仪表板的连接和通道,所有发布到 exchange 的消息都被传递到其他绑定队列。
抱歉各位,我发现了问题。
log4j.properties 未配置。
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%m%n
您需要回调 "BarStream",这是来源。您可以尝试以下示例。
@Test
public void rabbitmqSourceTest() throws InterruptedException {
AtomicInteger eventCount = new AtomicInteger(0);
String siddhiApp = "@App:name('TestExecutionPlan') "
+ "define stream FooStream (test string); "
+ "@info(name = 'query1') "
+ "@source(type ='rabbitmq', "
+ "uri = 'amqp://guest:guest@172.17.0.2:5672', "
+ "exchange.name = 'amq.topic', "
+ "exchange.type = 'topic', "
+ "routing.key= '#',"
+ "queue.name = 'siddhi-queue', "
+ "@map(type='text')) "
+ "define stream BarStream (test string); ";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
EventPrinter.print(event);
eventCount.incrementAndGet();
}
}
});
siddhiAppRuntime.start();
SiddhiAppRuntime executionPlanRuntime = siddhiManager.createSiddhiAppRuntime(
"@App:name('TestExecutionPlan') " +
"define stream FooStream (test string); " +
"@info(name = 'query1') " +
"@sink(type ='rabbitmq', uri = 'amqp://guest:guest@172.17.0.2:5672', " +
"exchange.type='topic', " +
"exchange.name = 'amq.topic', " +
"@map(type='text'))" +
"Define stream BarStream (test string);" +
"from FooStream select test insert into BarStream;");
InputHandler fooStream = executionPlanRuntime.getInputHandler("FooStream");
executionPlanRuntime.start();
List<Event> arrayList = new ArrayList<Event>();
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"IBM"}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
fooStream.send(arrayList.toArray(new Event[3]));
SiddhiTestHelper.waitForEvents(waitTime, 3, eventCount, timeout);
executionPlanRuntime.shutdown();
siddhiAppRuntime.shutdown();
}
参考 Siddhi 查询指南:Siddhi-source