如何用Kafka consumer编写Flink CEP的Junit测试代码

How to write Junit test code for Flink CEP with Kafka consumer

我们有一个 execute() 方法,我们使用 FlinkKafkaConsumer08 作为我们的 Flink CEP 源,然后我们有 CEP 模式,警报再次进入另一个 kafka 主题。如何为这个 execute() 方法编写一个 junit 测试用例?任何人都可以为此提供一个示例 junit 代码吗?

Pattern.<WebConnectionUseCase>begin("start")
                .where(new SimpleCondition<WebConnectionUseCase>() {
                    public boolean filter(WebConnectionUseCase event) {
                        return ((event.getValues().getPredictedAvailableMemory()
                                - event.getValues().getAvailableMemory()) >= STARTDIFF);
                    }
                }).followedBy("middle").where(new IterativeCondition<WebConnectionUseCase>() {
                    public boolean filter(WebConnectionUseCase value, Context<WebConnectionUseCase> ctx)
                            throws Exception {

                        Iterable<WebConnectionUseCase> middleStops = ctx.getEventsForPattern("middle");
                        List<Double> diffMemoryList = new ArrayList<Double>();
                        List<Double> connectionList = new ArrayList<Double>();
                        middleStops.forEach(item -> diffMemoryList.add(item.getValues().getPredictedAvailableMemory()
                                - item.getValues().getAvailableMemory()));
                        middleStops.forEach(item -> connectionList.add(item.getValues().getConnection()));
                        return checkIncreasingPattern(diffMemoryList) && checkDecreasingPattern(connectionList);
                    }

                    private boolean checkDecreasingPattern(List<Double> list) {
                        //code
                    }

                    private boolean checkIncreasingPattern(List<Double> list) {
                        // code
                    }
                }).times(PATTERNCOUNT).consecutive().next("end").where(new SimpleCondition<WebConnectionUseCase>() {
                    @Override
                    public boolean filter(WebConnectionUseCase event) {
                        return ((event.getValues().getPredictedAvailableMemory()
                                - event.getValues().getAvailableMemory()) >= ENDDIFF);
                    }
                }).within(Time.minutes(TIMEOUTDURATION));

我会将您要测试的部分封装在一个对象中,您可以连接到特殊源和接收器进行测试,并连接到实时数据 sources/sinks 进行生产。

对于测试接收器,您可以使用:

public static class TestSink<OUT> implements SinkFunction<OUT> {

    // must be static
    public static final List values = new ArrayList<>();

    @Override
    public void invoke(OUT value, Context context) throws Exception {
        values.add(value);
    }
}

然后您的测试可以将 sink.values 与预期结果进行比较。

编写执行事件时间处理的测试(与使用处理时间的测试相比)更容易,因为处理时间的结果是不确定的。而且更容易进行 运行 并行度为 1 的测试,也是为了获得确定性结果。

您会找到一些测试示例 here