将 EPL 模块中的 subscriber/listener 附加到具有上下文分区的语句
Attaching a subscriber/listener within EPL module to a statement with context partitions
我有以下成功部署的 EPL 模块:
module context;
import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;
@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);
@Name('createSchemaEvent')
create schema TickEvent as TickEvent;
@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;
@Name('compareStocks')
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
我对 listeners/subscribers 有疑问。根据我的检查和调试,classes 没有任何问题,注释有效,它们在部署时附加到语句,但它们都没有收到来自事件的任何更新。
这是我的订阅者,我只想打印它已收到:
package subscribers;
import java.util.Map;
public class MySubscriber {
public void update(Map row) {
System.out.println("got it");
}
}
我以前有没有任何上下文分区的相同模块,然后订户可以正常工作。添加上下文后,它停止了。
到目前为止我已经尝试过:
- 检查语句是否有任何 subscriber/listener 附加(有)
- 检查他们的名字
- 删除注释并在部署后在 Java 代码中手动设置它们(同样的事情 - 它们附加,我可以检索它们的名称但仍然没有收到更新)
- 调试订户class。该程序要么根本不去那里在断点处停止,要么我得到一个错误(缺少行号属性错误 - ("can't place a break point there" 我试图修复无济于事)
知道是什么原因导致的,或者将订阅者设置为具有上下文分区的语句的最佳方法是什么?
这是之前已在此处解决的问题的延续 -
编辑:以我使用的格式和 EPL 在线工具格式发送的事件:
我首先从用户那里得到要关注的对:
System.out.println("First stock:");
String first = scanner.nextLine();
System.out.println("Second stock:");
String second = scanner.nextLine();
System.out.println("Difference:");
double diff= scanner.nextDouble();
InitEvent init = new InitEvent(first, second, diff);
之后我有一个引擎线程连续发送事件,但在它开始之前 InitEvents 是这样发送的:
@Override
public void run() {
runtime.sendEvent(initEvent);
while (contSimulation) {
TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
runtime.sendEvent(tick1);
TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
runtime.sendEvent(tick2);
TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
runtime.sendEvent(tick3);
TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
runtime.sendEvent(tick4);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
}
}
我以前没有使用过在线工具,但我想我可以用了。这是模块文本:
module context;
create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);
create context TwoStocksContext
initiated by InitEvent as initEvent;
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
以及事件的顺序:
InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}
TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}
我使用它们得到的结果:
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH',
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH',
b_stockCode='GOO'}
如果我使第二组事件在 YAH/GOO 之间的差异小于 5,我只会从第一对事件中获得有意义的输出。这是,我认为它应该做的。
如果需要,这两个方法读取并处理 EPL 模块的注释(我不是自己写的,它们取自 coinTrader Context class,可在此处找到 - https://github.com/timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java):
private static Object getSubscriber(String className) throws Exception {
Class<?> cl = Class.forName(className);
return cl.newInstance();
}
private static void processAnnotations(EPStatement statement) throws Exception {
Annotation[] annotations = statement.getAnnotations();
for (Annotation annotation : annotations) {
if (annotation instanceof Subscriber) {
Subscriber subscriber = (Subscriber) annotation;
Object obj = getSubscriber(subscriber.className());
System.out.println(subscriber.className());
statement.setSubscriber(obj);
} else if (annotation instanceof Listeners) {
Listeners listeners = (Listeners) annotation;
for (String className : listeners.classNames()) {
Class<?> cl = Class.forName(className);
Object obj = cl.newInstance();
if (obj instanceof StatementAwareUpdateListener) {
statement.addListener((StatementAwareUpdateListener) obj);
} else {
statement.addListener((UpdateListener) obj);
}
}
}
}
}
嗯,折腾了一个月终于解决了。如果将来有人遇到类似问题,这就是问题所在。 epl 在在线工具中运行良好,但在我的代码中运行不正常。最终,我发现初始事件没有触发,因此没有创建上下文分区,因此订阅者和侦听器没有收到任何更新。我的错误是我触发了 POJO InitEvent,但是上下文使用的事件是通过创建模式在 EPL 模块中创建的。我不知道我在想什么,现在它不起作用了,这是有道理的。因此,我在 Java 中触发的事件不是上下文使用的事件。我的解决方案仅在 EPL 内。因为我不知道我是否可以触发模块中创建的 Java 中的事件,所以我创建了一个由我的 POJO 填充的模式,然后流由上下文使用:
@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference
double);
@Name('insertInitEvent')
insert into StartEvent
select * from InitEvent;
其他所有内容保持不变,Java 代码也是如此。
我有以下成功部署的 EPL 模块:
module context;
import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;
@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);
@Name('createSchemaEvent')
create schema TickEvent as TickEvent;
@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;
@Name('compareStocks')
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
我对 listeners/subscribers 有疑问。根据我的检查和调试,classes 没有任何问题,注释有效,它们在部署时附加到语句,但它们都没有收到来自事件的任何更新。
这是我的订阅者,我只想打印它已收到:
package subscribers;
import java.util.Map;
public class MySubscriber {
public void update(Map row) {
System.out.println("got it");
}
}
我以前有没有任何上下文分区的相同模块,然后订户可以正常工作。添加上下文后,它停止了。
到目前为止我已经尝试过:
- 检查语句是否有任何 subscriber/listener 附加(有)
- 检查他们的名字
- 删除注释并在部署后在 Java 代码中手动设置它们(同样的事情 - 它们附加,我可以检索它们的名称但仍然没有收到更新)
- 调试订户class。该程序要么根本不去那里在断点处停止,要么我得到一个错误(缺少行号属性错误 - ("can't place a break point there" 我试图修复无济于事)
知道是什么原因导致的,或者将订阅者设置为具有上下文分区的语句的最佳方法是什么?
这是之前已在此处解决的问题的延续 -
编辑:以我使用的格式和 EPL 在线工具格式发送的事件:
我首先从用户那里得到要关注的对:
System.out.println("First stock:");
String first = scanner.nextLine();
System.out.println("Second stock:");
String second = scanner.nextLine();
System.out.println("Difference:");
double diff= scanner.nextDouble();
InitEvent init = new InitEvent(first, second, diff);
之后我有一个引擎线程连续发送事件,但在它开始之前 InitEvents 是这样发送的:
@Override
public void run() {
runtime.sendEvent(initEvent);
while (contSimulation) {
TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
runtime.sendEvent(tick1);
TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
runtime.sendEvent(tick2);
TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
runtime.sendEvent(tick3);
TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
runtime.sendEvent(tick4);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
}
}
我以前没有使用过在线工具,但我想我可以用了。这是模块文本:
module context;
create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);
create context TwoStocksContext
initiated by InitEvent as initEvent;
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
以及事件的顺序:
InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}
TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}
我使用它们得到的结果:
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH',
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH',
b_stockCode='GOO'}
如果我使第二组事件在 YAH/GOO 之间的差异小于 5,我只会从第一对事件中获得有意义的输出。这是,我认为它应该做的。
如果需要,这两个方法读取并处理 EPL 模块的注释(我不是自己写的,它们取自 coinTrader Context class,可在此处找到 - https://github.com/timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java):
private static Object getSubscriber(String className) throws Exception {
Class<?> cl = Class.forName(className);
return cl.newInstance();
}
private static void processAnnotations(EPStatement statement) throws Exception {
Annotation[] annotations = statement.getAnnotations();
for (Annotation annotation : annotations) {
if (annotation instanceof Subscriber) {
Subscriber subscriber = (Subscriber) annotation;
Object obj = getSubscriber(subscriber.className());
System.out.println(subscriber.className());
statement.setSubscriber(obj);
} else if (annotation instanceof Listeners) {
Listeners listeners = (Listeners) annotation;
for (String className : listeners.classNames()) {
Class<?> cl = Class.forName(className);
Object obj = cl.newInstance();
if (obj instanceof StatementAwareUpdateListener) {
statement.addListener((StatementAwareUpdateListener) obj);
} else {
statement.addListener((UpdateListener) obj);
}
}
}
}
}
嗯,折腾了一个月终于解决了。如果将来有人遇到类似问题,这就是问题所在。 epl 在在线工具中运行良好,但在我的代码中运行不正常。最终,我发现初始事件没有触发,因此没有创建上下文分区,因此订阅者和侦听器没有收到任何更新。我的错误是我触发了 POJO InitEvent,但是上下文使用的事件是通过创建模式在 EPL 模块中创建的。我不知道我在想什么,现在它不起作用了,这是有道理的。因此,我在 Java 中触发的事件不是上下文使用的事件。我的解决方案仅在 EPL 内。因为我不知道我是否可以触发模块中创建的 Java 中的事件,所以我创建了一个由我的 POJO 填充的模式,然后流由上下文使用:
@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference
double);
@Name('insertInitEvent')
insert into StartEvent
select * from InitEvent;
其他所有内容保持不变,Java 代码也是如此。