如何向 flink CEP 数据流添加新事件?

how to add new event to flink CEP data stream?

我正在使用flink 1.5.2 来解决一个CEP 问题。

我的数据来自一个列表,当系统处于 运行 时,一些其他进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在阅读官方网站示例。以下是我认为应该执行的步骤。

  1. 使用 env.fromCollection(list);
  2. 创建数据流
  3. 定义一个 Pattern 模式
  4. 使用 CEP.pattern(data_stream, pattern)
  5. 获取一个 PatternStream
  6. 使用pattern_stream.select( ...实现select接口...)将复杂事件结果作为DataStream

但是我的输入流应该是无界的。我没有在 DataStream<> 对象中找到任何 add() 方法。我该如何做到这一点?而且,我是否需要告诉 DataStream<> 何时清理过时的事件?

仅当使用预先固定的有界输入集时,如编写测试或只是试验时,集合才适合作为 Flink 的输入源。如果你想要一个无界流,你需要选择一个不同的源,比如套接字或像 Kafka 这样的消息队列系统。

套接字很容易用于实验。在 Linux 和 MacOS 系统上,您可以使用

nc -lk 9999

创建一个 Flink 可以绑定到端口 9999 的套接字,并且您提供的任何输入 nc (netcat) 都将一次一行地流式传输到您的 Flink 作业中。 Netcat 也可用于 Windows,但未预安装。

但是,您不应该计划在生产中使用套接字,因为它们无法倒带(这对于在故障恢复期间使用 Flink 获得准确结果至关重要)。