如何在 Flink 中连接 2 个以上的流?

How to connect more than 2 streams in Flink?

我有 3 个不同类型的键控数据流。

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;

每个流都有自己定义的处理逻辑,并在它们之间共享一个状态。我想连接这 3 个流,只要任何流中的数据可用,就会触发相应的处理函数。可以连接两个流。

first.connect(second).process(<CoProcessFunction>)

我不能使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。

除了联合,标准方法是在级联中使用连接,例如,

first.connect(second).process(...).connect(third).process(...)

您将无法在一处共享所有三个流之间的状态。无论后续流程功能需要什么,您都可以让第一个流程功能输出,但第三个流将无法影响第一个流程功能中的状态,这对于某些用例来说是个问题。

另一种可能是利用 lower-level 机制——有关详细信息,请参阅 FLIP-92: Add N-Ary Stream Operator in Flink. However, this mechanism is intended for internal use (the Table/SQL API uses this for n-way joins), and would need to be treated with caution. See the mailing list discussion。我提到这个是为了完整性,但我怀疑这是一个好主意,直​​到界面得到进一步开发。

您可能还想看看 stateful functions api,它克服了数据流 api 的许多限制。

包装方法确实不错。您可以创建一个类似于 Flink 现有 Either<Left, Right>EitherOfThree<T1, T2, T3> 包装器 class,然后在单个函数中处理这些记录的流。类似于:

    DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
        .union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
        .union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
    combo.process(new MyProcessFunction());

Flink 的 Either class 有一个更优雅的实现,但对于您的用例,一些简单的东西应该可以工作。