Flink中如何管理数据源连接池

How to Manage Data Source Connection Pool in Flink

我正在尝试 运行 Flink 客户端应用程序,我正在从文件中读取数据。 文件中的每条记录都应使用 ProcessFunction 运算符根据数据库中的记录进行验证。我已经配置了一个具有单吨设计模式的数据源。我有以下问题

  1. 此数据源是否将在所有任务管理器和任务槽之间共享?
  2. 使用 Open 方法在处理器中创建数据源连接是否是一个好主意,只有一个作为最大连接池大小?
   public void open(Configuration parameters) throws Exception {
       //1. If i open a Connection Here
       //2. Shall i declare a Data Source COnnection here with only 1 as Maximum No of Connections
       super.open(parameters);
   }

如果我正确理解了您的用例,那么在外部数据库中将 Lookup Join 与 table 一起实现会更容易。

如果你必须自己实现这个,如果你使用 Flink 的 async i/o operator,而不是在 ProcessFunction 中向数据库发出同步请求,效果会更好。 (在Flink的用户函数中做阻塞i/o容易出问题,应该避免。)

但为了帮助回答您原来的问题:

给定任务管理器中的所有任务槽都在同一个 JVM 中。但是每个任务管理器都在一个单独的 JVM 中。每个任务槽都有自己的流程函数实例;每个实例将 运行 在不同的线程中。

不可能有一个在任务管理器之间共享连接的全局连接池。在多槽任务管理器中可以使用 static class 建立跨槽共享的连接池,但使用 static classes 这种方式在 Flink 中被认为是一种反模式。它可能导致死锁,并且还需要小心 class 加载(静态意味着每个 classloader 一个实例,因此您必须确保 class 由父 [=32] 加载=]loader,通过将 class 放在 /lib 中,或者通过配置 classloader.parent-first-patterns.additional (docs) 来获取这个特定的 class).

有关为什么不应该这样做的更多信息,请观看 https://youtu.be/F7HQd3KX2TQ?t=1407

连接池可能是一个好主意,它与 Flink 的异步 i/o 运算符结合使用。每个运算符实例都在管理对外部数据库或服务的并发请求池,使用连接池可以提高性能。但是许多异步客户端库已经这样做了,在这种情况下就没有必要自己做。