如何在 Flink 中连接 2 个以上的流?
How to connect more than 2 streams in Flink?
我有 3 个不同类型的键控数据流。
DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
每个流都有自己定义的处理逻辑,并在它们之间共享一个状态。我想连接这 3 个流,只要任何流中的数据可用,就会触发相应的处理函数。可以连接两个流。
first.connect(second).process(<CoProcessFunction>)
我不能使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。
除了联合,标准方法是在级联中使用连接,例如,
first.connect(second).process(...).connect(third).process(...)
您将无法在一处共享所有三个流之间的状态。无论后续流程功能需要什么,您都可以让第一个流程功能输出,但第三个流将无法影响第一个流程功能中的状态,这对于某些用例来说是个问题。
另一种可能是利用 lower-level 机制——有关详细信息,请参阅 FLIP-92: Add N-Ary Stream Operator in Flink. However, this mechanism is intended for internal use (the Table/SQL API uses this for n-way joins), and would need to be treated with caution. See the mailing list discussion。我提到这个是为了完整性,但我怀疑这是一个好主意,直到界面得到进一步开发。
您可能还想看看 stateful functions api,它克服了数据流 api 的许多限制。
包装方法确实不错。您可以创建一个类似于 Flink 现有 Either<Left, Right>
的 EitherOfThree<T1, T2, T3>
包装器 class,然后在单个函数中处理这些记录的流。类似于:
DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
.union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
.union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
combo.process(new MyProcessFunction());
Flink 的 Either
class 有一个更优雅的实现,但对于您的用例,一些简单的东西应该可以工作。
我有 3 个不同类型的键控数据流。
DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
每个流都有自己定义的处理逻辑,并在它们之间共享一个状态。我想连接这 3 个流,只要任何流中的数据可用,就会触发相应的处理函数。可以连接两个流。
first.connect(second).process(<CoProcessFunction>)
我不能使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。
除了联合,标准方法是在级联中使用连接,例如,
first.connect(second).process(...).connect(third).process(...)
您将无法在一处共享所有三个流之间的状态。无论后续流程功能需要什么,您都可以让第一个流程功能输出,但第三个流将无法影响第一个流程功能中的状态,这对于某些用例来说是个问题。
另一种可能是利用 lower-level 机制——有关详细信息,请参阅 FLIP-92: Add N-Ary Stream Operator in Flink. However, this mechanism is intended for internal use (the Table/SQL API uses this for n-way joins), and would need to be treated with caution. See the mailing list discussion。我提到这个是为了完整性,但我怀疑这是一个好主意,直到界面得到进一步开发。
您可能还想看看 stateful functions api,它克服了数据流 api 的许多限制。
包装方法确实不错。您可以创建一个类似于 Flink 现有 Either<Left, Right>
的 EitherOfThree<T1, T2, T3>
包装器 class,然后在单个函数中处理这些记录的流。类似于:
DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
.union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
.union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
combo.process(new MyProcessFunction());
Flink 的 Either
class 有一个更优雅的实现,但对于您的用例,一些简单的东西应该可以工作。