当 TaskManager 失败时 Flink 预期的 HA 行为
Flink expected HA behaviour when a TaskManager fails
我创建了一个 HA Flink v1.2 集群,由 1 个 JobManager 和 2 个 TaskManager 组成,每个都在自己的 VM 中(不使用 YARN 或 hdfs)。
在 JobManager 节点上开始一项工作后,我终止了一个 TaskManager 实例。我可以立即在 Web 仪表板中看到作业被取消然后失败。如果我检查日志:
03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:610)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Job execution switched to status FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:610)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED
在作业实现中我有
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number
// of
// restart
// attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
我的问题是 JobManager 不应该自动将所有请求重定向到剩余的 / 运行ning TaskManager 吗?
同样,如果我启动 JobManager 和 1 个 TaskManager 实例,以及 运行 个作业,当我启动第 2 个 TaskManager 实例时,它是否也应该有助于解决 运行ning 作业?
谢谢!
首先RestartStrategy
与HA模式无关。高可用性涉及 JobManager
的可用性。无论如何,要使 HA 工作,至少需要两个 JobManager 实例(你说你只启动一个)。
至于 RestartStrategy
当您在失败后指定 fixedDelayRestart
策略时(例如在您的情况下 kill TaskManager),作业将再次尝试 运行 (在你的情况下 10 秒后)。如果在您的安装中不是这种情况,您可能缺少工作可用的资源 运行(我想每个 TaskManager
有 1 个任务槽,所以当只剩下一个时,您不能 运行 并行度为 2 或更多的作业)。
对于最后一个问题,添加 TaskManager
不会增加 运行 工作。以某种方式连接的行为称为动态缩放。您可以通过获取保存点然后使用更多资源对其进行重新运行来实现。看看here。正在进行自动重新缩放。
我创建了一个 HA Flink v1.2 集群,由 1 个 JobManager 和 2 个 TaskManager 组成,每个都在自己的 VM 中(不使用 YARN 或 hdfs)。 在 JobManager 节点上开始一项工作后,我终止了一个 TaskManager 实例。我可以立即在 Web 仪表板中看到作业被取消然后失败。如果我检查日志:
03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:610)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Job execution switched to status FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:610)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED
在作业实现中我有
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number
// of
// restart
// attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
我的问题是 JobManager 不应该自动将所有请求重定向到剩余的 / 运行ning TaskManager 吗? 同样,如果我启动 JobManager 和 1 个 TaskManager 实例,以及 运行 个作业,当我启动第 2 个 TaskManager 实例时,它是否也应该有助于解决 运行ning 作业?
谢谢!
首先RestartStrategy
与HA模式无关。高可用性涉及 JobManager
的可用性。无论如何,要使 HA 工作,至少需要两个 JobManager 实例(你说你只启动一个)。
至于 RestartStrategy
当您在失败后指定 fixedDelayRestart
策略时(例如在您的情况下 kill TaskManager),作业将再次尝试 运行 (在你的情况下 10 秒后)。如果在您的安装中不是这种情况,您可能缺少工作可用的资源 运行(我想每个 TaskManager
有 1 个任务槽,所以当只剩下一个时,您不能 运行 并行度为 2 或更多的作业)。
对于最后一个问题,添加 TaskManager
不会增加 运行 工作。以某种方式连接的行为称为动态缩放。您可以通过获取保存点然后使用更多资源对其进行重新运行来实现。看看here。正在进行自动重新缩放。