Flink 作业提交抛出 java.nio.file.NoSuchFileException 而文件实际存在
Flink Job submission throws java.nio.file.NoSuchFileException while the file actually exists
我试图提交一个已经打包在 JAR 中的 flink 作业。基本上它使用受 SASL 身份验证保护的 kafka 主题,因此它需要一个 .jks 文件,我已经将它们包含在 JAR 中并在代码中读取为:
try(InputStream resourceStream = loader.getResourceAsStream(configFile)){
properties.load(resourceStream);
properties.setProperty("ssl.truststore.location",
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath());
}
catch(Exception e){
System.out.println("Failed to load config");
}
为了测试,我尝试在两个不同(不同 VM 规格)的独立服务器上提交作业。一台服务器成功运行,但另一台服务器抛出 java.nio.file.NoSuchFileException
,说找不到我的 .jks 文件。有人可以指出它可能存在的问题吗?
这里flink部署在单机集群模式,版本如下:
- Flink版本:
1.14.0
- Java版本:
11.0.13
我意识到我的问题真的很愚蠢。这部分实际上 returns null 并触发异常。
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath()
问题是我通过网络提交作业 UI 因此我看不到打印的消息。因此,文件名解析为存储在 configFile 下的原始文件名,这是一个相对路径。为什么一台机器工作而另一台机器不工作?因为我以前不知何故在我的 homedir 上有 .jks
用于另一个测试:).
为了避免其他人跳入此错误,这里总结了如果 运行 来自 IDE(gradle 运行 将 .getResource()
解决的问题task) 和 jar。
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toString());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").getPath());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// null
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI().getPath());
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI());
kafka-client:2.4.1
org.apache.kafka.common.security.ssl.SslEngineBuilder#285
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}
看来我们应该将 jks 文件放在任务管理器可以通过绝对路径访问的文件系统(nfs 或 hdfs)中。
我试图提交一个已经打包在 JAR 中的 flink 作业。基本上它使用受 SASL 身份验证保护的 kafka 主题,因此它需要一个 .jks 文件,我已经将它们包含在 JAR 中并在代码中读取为:
try(InputStream resourceStream = loader.getResourceAsStream(configFile)){
properties.load(resourceStream);
properties.setProperty("ssl.truststore.location",
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath());
}
catch(Exception e){
System.out.println("Failed to load config");
}
为了测试,我尝试在两个不同(不同 VM 规格)的独立服务器上提交作业。一台服务器成功运行,但另一台服务器抛出 java.nio.file.NoSuchFileException
,说找不到我的 .jks 文件。有人可以指出它可能存在的问题吗?
这里flink部署在单机集群模式,版本如下:
- Flink版本:
1.14.0
- Java版本:
11.0.13
我意识到我的问题真的很愚蠢。这部分实际上 returns null 并触发异常。
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath()
问题是我通过网络提交作业 UI 因此我看不到打印的消息。因此,文件名解析为存储在 configFile 下的原始文件名,这是一个相对路径。为什么一台机器工作而另一台机器不工作?因为我以前不知何故在我的 homedir 上有 .jks
用于另一个测试:).
为了避免其他人跳入此错误,这里总结了如果 运行 来自 IDE(gradle 运行 将 .getResource()
解决的问题task) 和 jar。
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toString());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").getPath());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// null
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI().getPath());
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI());
kafka-client:2.4.1 org.apache.kafka.common.security.ssl.SslEngineBuilder#285
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}
看来我们应该将 jks 文件放在任务管理器可以通过绝对路径访问的文件系统(nfs 或 hdfs)中。