如何向 flink CEP 数据流添加新事件?
how to add new event to flink CEP data stream?
我正在使用flink 1.5.2 来解决一个CEP 问题。
我的数据来自一个列表,当系统处于 运行 时,一些其他进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在阅读官方网站示例。以下是我认为应该执行的步骤。
- 使用 env.fromCollection(list);
创建数据流
- 定义一个 Pattern 模式
- 使用 CEP.pattern(data_stream, pattern)
获取一个 PatternStream
- 使用pattern_stream.select( ...实现select接口...)将复杂事件结果作为DataStream
但是我的输入流应该是无界的。我没有在 DataStream<> 对象中找到任何 add() 方法。我该如何做到这一点?而且,我是否需要告诉 DataStream<> 何时清理过时的事件?
仅当使用预先固定的有界输入集时,如编写测试或只是试验时,集合才适合作为 Flink 的输入源。如果你想要一个无界流,你需要选择一个不同的源,比如套接字或像 Kafka 这样的消息队列系统。
套接字很容易用于实验。在 Linux 和 MacOS 系统上,您可以使用
nc -lk 9999
创建一个 Flink 可以绑定到端口 9999 的套接字,并且您提供的任何输入 nc
(netcat) 都将一次一行地流式传输到您的 Flink 作业中。 Netcat 也可用于 Windows,但未预安装。
但是,您不应该计划在生产中使用套接字,因为它们无法倒带(这对于在故障恢复期间使用 Flink 获得准确结果至关重要)。
我正在使用flink 1.5.2 来解决一个CEP 问题。
我的数据来自一个列表,当系统处于 运行 时,一些其他进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在阅读官方网站示例。以下是我认为应该执行的步骤。
- 使用 env.fromCollection(list); 创建数据流
- 定义一个 Pattern 模式
- 使用 CEP.pattern(data_stream, pattern) 获取一个 PatternStream
- 使用pattern_stream.select( ...实现select接口...)将复杂事件结果作为DataStream
但是我的输入流应该是无界的。我没有在 DataStream<> 对象中找到任何 add() 方法。我该如何做到这一点?而且,我是否需要告诉 DataStream<> 何时清理过时的事件?
仅当使用预先固定的有界输入集时,如编写测试或只是试验时,集合才适合作为 Flink 的输入源。如果你想要一个无界流,你需要选择一个不同的源,比如套接字或像 Kafka 这样的消息队列系统。
套接字很容易用于实验。在 Linux 和 MacOS 系统上,您可以使用
nc -lk 9999
创建一个 Flink 可以绑定到端口 9999 的套接字,并且您提供的任何输入 nc
(netcat) 都将一次一行地流式传输到您的 Flink 作业中。 Netcat 也可用于 Windows,但未预安装。
但是,您不应该计划在生产中使用套接字,因为它们无法倒带(这对于在故障恢复期间使用 Flink 获得准确结果至关重要)。