Esper 模式 timer:interval() 无效

Esper pattern timer:interval() is not working

我有这个基本的 POC 代码。我可以看到事件已发布,但听众没有收到任何警报。 在下面的代码中,语句 pattern1 或 pattern2 的 none 会收到警报。 Config 的 setInternalTimerEnabled 默认为 True 但我尝试在代码中将其设置为 True 但仍然是同样的问题。 我错过了什么吗? 任何线索都会非常有帮助。

public class PatternSimpleQue {

    public static void main(String args[]) throws InterruptedException {
        Configuration configuration = new Configuration();
        configuration.addEventType("TestEvent", TestEvent.class);
        configuration.getEngineDefaults().getThreading().setInternalTimerEnabled(true);
        configuration.getEngineDefaults().getLogging().setEnableTimerDebug(true);

        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(configuration);
        EPRuntime runtime = engine.getEPRuntime();
        EPAdministrator admin = engine.getEPAdministrator();
        engine.initialize();

        String pattern1 = "select * from pattern [every timer:interval(1 sec) -> every TestEvent]";
        String pattern2 = "select (select qty from TestEvent.std:lastevent()) as qty from pattern [every timer:interval(1 sec)]";

        admin.createEPL(pattern1).addListener((eventBeans, eventBeen1) -> {
            System.out.println("In event S3");
            try {
                for (EventBean anEventBean : eventBeans) {
                    System.out.println("****S3**** Pattern bean -> " + anEventBean.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        admin.createEPL(pattern2).addListener((eventBeans, eventBeen1) -> {
            System.out.println("In event S4");
            try {
                for (EventBean anEventBean : eventBeans) {
                    System.out.println("--S4--Pattern bean -> " + " :: " + anEventBean.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        while (true) {
            sendEvent(runtime, new TestEvent(String.valueOf(new Random().nextInt(100)), 10, 1));
            sendEvent(runtime, new TestEvent(String.valueOf(new Random().nextInt(100)), 20, -1));
            Thread.sleep(5000);
        }
    }

    private static void sendEvent(EPRuntime runtime, TestEvent testEvent) {
        System.out.println("\n-- New event: " + testEvent);
        runtime.sendEvent(testEvent);
    }
}


public class TestEvent {

    private String instanceId;
    private int qty;
    private int side;

    public String getInstanceId() {
        return instanceId;
    }

    public int getQty() {
        return qty;
    }

    public int getSide() {
        return side;
    }


    public TestEvent(String instanceId, int qty, int side) {
        this.instanceId = instanceId;
        this.qty = qty;
        this.side = side;
    }

    @Override
    public String toString() {
        return "TestEvent{" +
                "instanceId='" + instanceId + '\'' +
                ", qty=" + qty +
                ", side=" + side +
                '}';
    }
}

我通过添加额外的当前时间事件让它工作。

runtime.sendEvent(new CurrentTimeEvent(currentTimeMillis()));

我还是不知道为什么Esper的内部计时器不工作。即使我手动启用了它。

configuration.getEngineDefaults().getThreading().setInternalTimerEnabled(true);

试试 运行 这个代码,应该可以。

public class PatternSimpleQue {

    public static void main(String args[]) throws InterruptedException {
        Configuration configuration = new Configuration();
        configuration.addEventType("TestEvent", TestEvent.class);
        configuration.getEngineDefaults().getThreading().setInternalTimerEnabled(true);
        configuration.getEngineDefaults().getLogging().setEnableTimerDebug(true);

        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(configuration);
        EPRuntime runtime = engine.getEPRuntime();
        EPAdministrator admin = engine.getEPAdministrator();
        engine.initialize();

        String pattern1 = "select * from pattern [every timer:interval(1 sec) -> every TestEvent]";
        String pattern2 = "select (select qty from TestEvent.std:lastevent()) as qty from pattern [every timer:interval(1 sec)]";

        admin.createEPL(pattern1).addListener((eventBeans, eventBeen1) -> {
            System.out.println("In event S3");
            try {
                for (EventBean anEventBean : eventBeans) {
                    System.out.println("****S3**** Pattern bean -> " + anEventBean.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        admin.createEPL(pattern2).addListener((eventBeans, eventBeen1) -> {
            System.out.println("In event S4");
            try {
                for (EventBean anEventBean : eventBeans) {
                    System.out.println("--S4--Pattern bean -> " + " :: " + anEventBean.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        while (true) {
            sendEvent(runtime, new TestEvent(String.valueOf(new Random().nextInt(100)), 10, 1));
            sendEvent(runtime, new TestEvent(String.valueOf(new Random().nextInt(100)), 20, -1));
            Thread.sleep(5000);
        }
    }

    private static void sendEvent(EPRuntime runtime, TestEvent testEvent) {
        System.out.println("\n-- New event: " + testEvent);
        runtime.sendEvent(new CurrentTimeEvent(currentTimeMillis()));
        runtime.sendEvent(testEvent);
    }
}

基本上,Esper 的内部计时器存在一个已报告的问题。 Another similar issue.

More on config settings.