Flink 作业提交抛出 java.nio.file.NoSuchFileException 而文件实际存在

Flink Job submission throws java.nio.file.NoSuchFileException while the file actually exists

我试图提交一个已经打包在 JAR 中的 flink 作业。基本上它使用受 SASL 身份验证保护的 kafka 主题,因此它需要一个 .jks 文件,我已经将它们包含在 JAR 中并在代码中读取为:

try(InputStream resourceStream = loader.getResourceAsStream(configFile)){
        properties.load(resourceStream);
        properties.setProperty("ssl.truststore.location",
            loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath());
      }

catch(Exception e){
        System.out.println("Failed to load config");
      }

为了测试,我尝试在两个不同(不同 VM 规格)的独立服务器上提交作业。一台服务器成功运行,但另一台服务器抛出 java.nio.file.NoSuchFileException,说找不到我的 .jks 文件。有人可以指出它可能存在的问题吗?

这里flink部署在单机集群模式,版本如下:

我意识到我的问题真的很愚蠢。这部分实际上 returns null 并触发异常。

loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath()

问题是我通过网络提交作业 UI 因此我看不到打印的消息。因此,文件名解析为存储在 configFile 下的原始文件名,这是一个相对路径。为什么一台机器工作而另一台机器不工作?因为我以前不知何故在我的 homedir 上有 .jks 用于另一个测试:).

为了避免其他人跳入此错误,这里总结了如果 运行 来自 IDE(gradle 运行 将 .getResource() 解决的问题task) 和 jar。

//      file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
      System.out.println(loader.getResource("kafka-client.trustore.jks").toString());

//      home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
      System.out.println(loader.getResource("kafka-client.trustore.jks").getPath());

//      home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      null
      System.out.println(loader.getResource("kafka-client.trustore.jks").toURI().getPath());

//      file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
      System.out.println(loader.getResource("kafka-client.trustore.jks").toURI());

kafka-client:2.4.1 org.apache.kafka.common.security.ssl.SslEngineBuilder#285

try (InputStream in = Files.newInputStream(Paths.get(path))) {
    KeyStore ks = KeyStore.getInstance(type);
    // If a password is not set access to the truststore is still available, but integrity checking is disabled.
    char[] passwordChars = password != null ? password.value().toCharArray() : null;
    ks.load(in, passwordChars);
    return ks;
  } catch (GeneralSecurityException | IOException e) {
    throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}

看来我们应该将 jks 文件放在任务管理器可以通过绝对路径访问的文件系统(nfs 或 hdfs)中。