如何清理底层语句为 "INSERT INTO" 的 EPSstatement#iterator()

How to clean up EPStatement#iterator() whose underlying statement is "INSERT INTO"

我正在调查 Esper 5.5.0 的一个问题。在我正在处理的代码库中,使用了“INSERT INTO”语句,它使用 EPSstatement#iterator() 从“INSERT INTO”语句中提取数据。它 return 是一个非空迭代器(虽然我觉得这很奇怪)。

问题是 Iterator 不断累积数据并且永远不会被清理。我试图找到一种方法来清理 Iterator 中的数据,但我不知道该怎么做。它的 remove() 方法抛出一个异常,并且从派生的 window 中删除数据对对应于“INSERT INTO”语句的 EPStatement 对象没有任何影响。如何清理 Iterator 中与“INSERT INTO”语句对应的数据? (EDIT:不是派生的window,而是“INSERT INTO”语句本身)

不幸的是,我什至无法创建一个简单的复制器。他们做了类似下面的事情,但是当我尝试在新代码中复制该行为时,迭代器总是空的。我还想知道复制该行为缺少什么。

public class MyTest {
    @Test
    void eplStatementReturningNonEmptyIterator() {
        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider();
        EPRuntime rt = engine.getEPRuntime();
        EPAdministrator adm = engine.getEPAdministrator();
        adm.getConfiguration().addEventType(PersonEvent.class);
        adm.createEPL("create window PersonEventWindow.win:keepall() as PersonEvent");
        EPStatement epl = adm.createEPL("insert into PersonEventWindow select irstream * from PersonEvent");

        rt.sendEvent(new PersonEvent("foo", 1));
        rt.sendEvent(new PersonEvent("bar", 2));

        // passes, but this question is not about this one
        assert count(rt.executeQuery("select * from PersonEventWindow").iterator()) == 2;
       
        // This question is about this one, I want to clean up the Iterator which epl.iterator() returns
        assert count(epl.iterator()) == 2; 

        // (this assertion ^ fails actually; I cannot even replicate the issue)
    }

    private static int count(Iterator<?> it) {
        int count = 0;
        while (it.hasNext()) {
            it.next();
            count++;
        }
        return count;
    }

    public static class PersonEvent {
        private String name;
        private int age;
        public PersonEvent(String name, int age) {
            this.name = name;
            this.age = age;
        }
        public String getName() {
            return name;
        }
        public int getAge() {
            return age;
        }
    }
}

此代码创建名为 window PersonEventWindow#keepall 的名称,用于保存所有到达的事件(keepall 的含义)。该代码执行即发即弃查询 rt.executeQuery,从命名的 window 中选择所有事件,返回的迭代器提供每个事件。我认为迭代器不允许删除。一种选择是使用时间 window 自动从命名的 window 中删除数据,例如 PersonEventWindow#time(10) 只保留最后 10 秒。另一种选择,执行像 rt.executeQuery("delete from PersonEventWindow") 这样的即发即弃查询,它会删除命名 window.

中的所有事件

事实证明,如果迭代器从 window 中选择“插入到...”returns 个元素。为了清理迭代器,我们可以从 window 中删除数据,“插入到”查询从中选择数据。

下面的代码验证了我相信的解释:

public class MyTest3 {
    EPServiceProvider engine;
    EPAdministrator epa;
    EPRuntime epr;

    @BeforeEach
    void setUp() {
        engine = EPServiceProviderManager.getDefaultProvider();
        epa = engine.getEPAdministrator();
        epr = engine.getEPRuntime();
    }

    @Test
    @DisplayName("The Iterator gets cleaned up by delete from MyWindow")
    void cleanUpIterator() {
        epa.getConfiguration().addEventType(MyEvent.class);
        epa.createEPL("create window MyWindow.std:unique(id) as MyEvent");
        epa.createEPL("insert into   MyWindow select id from    MyEvent");
        epr.sendEvent(new MyEvent(1));
        epr.sendEvent(new MyEvent(2));
        EPStatement insertIntoAnotherWindow = epa.createEPL("insert into AnotherWindow select id from MyWindow");
        assertThat(count(insertIntoAnotherWindow.iterator())).isEqualTo(2); // this returns the events surprisingly

        epr.executeQuery("delete from MyWindow");

        assertThat(count(insertIntoAnotherWindow.iterator())).isEqualTo(0); // now it's empty
    }

    public static class MyEvent {
        private final int id;

        public MyEvent(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }
    }

    @AfterEach
    void tearDown() {
        engine.destroy();
    }

    private static int count(Iterator<?> it) {
        int count = 0;
        while (it.hasNext()) {
            it.next();
            count++;
        }
        return count;
    }
}