连接mqtt时直接缓冲内存错误
Direct Buffer Memory error when connecting mqtt
我们是 运行 Flink 集群上的 Apache Beam 应用程序。
几天后应用程序失败并出现以下错误:
Caused by: javax.net.ssl.SSLException: failure when writing TLS control frames
at io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure(SslHandler.java:1870)
at io.netty.handler.ssl.SslHandler.access0(SslHandler.java:167)
at io.netty.handler.ssl.SslHandler.operationComplete(SslHandler.java:985)
at io.netty.handler.ssl.SslHandler.operationComplete(SslHandler.java:980)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:183)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:980)
at io.netty.handler.ssl.SslHandler.handshake(SslHandler.java:2046)
at io.netty.handler.ssl.SslHandler.startHandshakeProcessing(SslHandler.java:1966)
at io.netty.handler.ssl.SslHandler.channelActive(SslHandler.java:2101)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:305)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
使用以下方法建立连接:
private Mqtt5AsyncClient setupClient() {
Mqtt5ClientBuilder mqttClientBuilder = Mqtt5Client.builder().identifier("beam-"+UUID.randomUUID().toString())
.serverHost(properties.getServerHost()).serverPort(properties.getServerPort());
if (properties.getUsername() != null && properties.getPassword() != null) {
mqttClientBuilder = mqttClientBuilder.simpleAuth().username(properties.getUsername())
.password(properties.getPassword().getBytes()).applySimpleAuth();
} else if (properties.getUsername() != null || properties.getPassword() != null) {
LoggerFactory.getLogger(getClass()).error("Both username and password must be provided!");
}
if (properties.isSslEnabled()) { // Add ssl config if ssl is enabled
try {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(new ByteArrayInputStream(truststore), properties.getTrustStorePassword().toCharArray());
tmf.init(ks);
if (properties.skipHostnameVerification()) { // Disable host name verification if required
mqttClientBuilder = mqttClientBuilder.sslConfig().trustManagerFactory(tmf)
.hostnameVerifier(new NoopHostnameVerifier()).applySslConfig();
} else {
mqttClientBuilder = mqttClientBuilder.sslConfig().trustManagerFactory(tmf).applySslConfig();
}
} catch (NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) {
LoggerFactory.getLogger(getClass()).error("Error while setting up ssl", e);
}
}
Mqtt5BlockingClient newClient = mqttClientBuilder.buildBlocking();
newClient.connect();
mqttClientCount.inc();
return newClient.toAsync();
}
But it runs on my machine
When I start the application from my eclipse project all works fine. So the error only happen in the Flink Cluster which makes debugging a little bit difficult.
服务器上唯一发生变化的是对 openjdk-11.0.14 的更新。将我电脑上的 java 更新到相同版本根本不会产生错误。
所以我 运行 不知道是什么导致了错误。
所以我检查了以下内容
- SSL 证书是否有效 --> 是
- 用户名和密码是否正确 --> 是
- mqtt端是否有日志-->没有
似乎 Java 应用程序在连接到 mqtt 代理之前失败了。因为在 mqtt 代理上没有任何登录尝试。
我们正在使用 spring-boot-2.1.18 和 hivemq-mqtt-client:1.2.2。经纪人是一个vernemq。
欢迎提出任何建议。
提前致谢
所以我修好了。
我不知道问题到底出在哪里。
我在一切正常时加载了我的虚拟机的备份。然后我把openjdk-11更新到最新版本。在此之后错误再次出现。
所以确实是更新开的问题jdk-11-jdk11.0.14.
删除 openjdk 并重新安装后一切正常。
也许这会对某人有所帮助。
我们是 运行 Flink 集群上的 Apache Beam 应用程序。
几天后应用程序失败并出现以下错误:
Caused by: javax.net.ssl.SSLException: failure when writing TLS control frames
at io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure(SslHandler.java:1870)
at io.netty.handler.ssl.SslHandler.access0(SslHandler.java:167)
at io.netty.handler.ssl.SslHandler.operationComplete(SslHandler.java:985)
at io.netty.handler.ssl.SslHandler.operationComplete(SslHandler.java:980)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:183)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:980)
at io.netty.handler.ssl.SslHandler.handshake(SslHandler.java:2046)
at io.netty.handler.ssl.SslHandler.startHandshakeProcessing(SslHandler.java:1966)
at io.netty.handler.ssl.SslHandler.channelActive(SslHandler.java:2101)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:305)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
使用以下方法建立连接:
private Mqtt5AsyncClient setupClient() {
Mqtt5ClientBuilder mqttClientBuilder = Mqtt5Client.builder().identifier("beam-"+UUID.randomUUID().toString())
.serverHost(properties.getServerHost()).serverPort(properties.getServerPort());
if (properties.getUsername() != null && properties.getPassword() != null) {
mqttClientBuilder = mqttClientBuilder.simpleAuth().username(properties.getUsername())
.password(properties.getPassword().getBytes()).applySimpleAuth();
} else if (properties.getUsername() != null || properties.getPassword() != null) {
LoggerFactory.getLogger(getClass()).error("Both username and password must be provided!");
}
if (properties.isSslEnabled()) { // Add ssl config if ssl is enabled
try {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(new ByteArrayInputStream(truststore), properties.getTrustStorePassword().toCharArray());
tmf.init(ks);
if (properties.skipHostnameVerification()) { // Disable host name verification if required
mqttClientBuilder = mqttClientBuilder.sslConfig().trustManagerFactory(tmf)
.hostnameVerifier(new NoopHostnameVerifier()).applySslConfig();
} else {
mqttClientBuilder = mqttClientBuilder.sslConfig().trustManagerFactory(tmf).applySslConfig();
}
} catch (NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) {
LoggerFactory.getLogger(getClass()).error("Error while setting up ssl", e);
}
}
Mqtt5BlockingClient newClient = mqttClientBuilder.buildBlocking();
newClient.connect();
mqttClientCount.inc();
return newClient.toAsync();
}
But it runs on my machine When I start the application from my eclipse project all works fine. So the error only happen in the Flink Cluster which makes debugging a little bit difficult.
服务器上唯一发生变化的是对 openjdk-11.0.14 的更新。将我电脑上的 java 更新到相同版本根本不会产生错误。
所以我 运行 不知道是什么导致了错误。
所以我检查了以下内容
- SSL 证书是否有效 --> 是
- 用户名和密码是否正确 --> 是
- mqtt端是否有日志-->没有
似乎 Java 应用程序在连接到 mqtt 代理之前失败了。因为在 mqtt 代理上没有任何登录尝试。
我们正在使用 spring-boot-2.1.18 和 hivemq-mqtt-client:1.2.2。经纪人是一个vernemq。
欢迎提出任何建议。
提前致谢
所以我修好了。
我不知道问题到底出在哪里。
我在一切正常时加载了我的虚拟机的备份。然后我把openjdk-11更新到最新版本。在此之后错误再次出现。
所以确实是更新开的问题jdk-11-jdk11.0.14.
删除 openjdk 并重新安装后一切正常。
也许这会对某人有所帮助。