使用 Apache Flink 的分布式执行和 class fields/members 的(反)序列化

Distributed execution with Apache Flink and (de)serialization of class fields/members

我有一个简单的 Flink 作业概念证明。它基本上从 Kafka 主题接收消息(JSON 格式),将它们反序列化为域模型,根据一些预定义的规则集验证这些消息,应用一些转换,最后将结果消息发布到 Kafka 接收器中。

我确实有几个 functions/operators 使用了其他“服务”类 的某些行为。这些“服务”类 还可以导入一些其他依赖项。

据我所知,Flink 会尝试(反)序列化那些 functions/operators 以使整个作业真正分布式。我不清楚 Flink 是否会通过在 fields/members 上使用 transient 来自动避免这种情况,或者是否足以将它们声明为 static 来避免这种情况。

这是我所拥有的示例:

public final class SomeFlatMapFunction implements FlatMapFunction<SomeMessage, Some> {
  private static final long serialVersionUID = -5810858761065889162L;

  private static final SomeMapper MAPPER = SomeMapper.INSTANCE;

  private static final Validator VALIDATOR = Validator.INSTANCE;

  @Override
  public void flatMap(final SomeMessage value, final Collector<Some> out) {
    final var result = MAPPER.valueFrom(value);
    final var violations = VALIDATOR.getValidator().validate(result);
    if (violations.isEmpty()) {
      out.collect(result);
    }
  }
}

到目前为止我还没有发现任何问题,但我只是 运行 本地应用程序。这里的 best/accepted 方法是什么,即使对于那些可能必须在函数的构造函数中注入这些依赖项的情况也是如此?非常不鼓励在这些函数之间维护状态。

运算符确实得到序列化和反序列化, 这就是为什么有几个 Rich* 版本的运算符带有 openclose 方法, 它们可用于在 反序列化 之后设置事物, 一旦操作员已经在任务管理器中,它将 运行。 Flink 会遵守 Java 的常规序列化规则,不会序列化 statictransient 成员。

根据我的经验,在运算符的构造函数中注入域 类 不是问题。 您需要注意的是在作业 运行ning 时通过网络的域 类, 有时称为数据传输对象。 对于这些,最简单的方法是将它们实现为 POJO,其中有两点很关键:

  • 它们必须有一个 no-argument 构造函数。
  • 它们应该用类型信息进行注释。请参阅 .
  • 中的示例

如果此类 POJO 将成为您应用程序 状态 的一部分,则第二点尤为重要, 即如果您使用 Flink 的托管状态 API.

还有一些你已经考虑过的事情: 添加 serialVersionUID 也是一个好主意。