Swift-NIO TCP 客户端自动重新连接
Swift-NIO TCP Client auto reconnect
我在Swift-NIO中写了一个TCP Client来连接Netty TCP Server。
我希望 tcp 客户端可以在需要时自动重新连接。
import Foundation
import NIO
class MessageHandler: ChannelInboundHandler {
let notificationMessage = NSNotification.Name(rawValue: "Location")
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer
private var numBytes = 0
private var task: RepeatedTask? = nil
private var bootstrap: ClientBootstrap
init(bootstrap: ClientBootstrap) {
self.bootstrap = bootstrap
}
public func channelActive(context: ChannelHandlerContext) {
print("Reconnect Successful")
self.task?.cancel()
context.fireChannelActive()
}
func channelInactive(context: ChannelHandlerContext) {
self.task = context.channel.eventLoop.scheduleRepeatedTask(initialDelay: TimeAmount.seconds(0), delay: TimeAmount.seconds(10), { (RepeatedTask) in
print("Reconnecting...")
try { () -> EventLoopFuture<Channel> in
return try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int)
}()
})
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = unwrapInboundIn(data)
let readableBytes = buffer.readableBytes
if let message = buffer.readString(length: readableBytes) {
print(message)
let dictMessage = ["Location": message]
NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
// As we are not really interested getting notified on success or failure we just pass nil as promise to
// reduce allocations.
context.close(promise: nil)
}
}
关于作品,但有些地方不对。
我使用 eventLoop.scheduleRepeatedTask 每 10 秒检查一次,连接后取消 RepeatedTask.But self.task?.cancel() 不起作用,我查看了取消的源代码。
取消 RepeatedTask 的正确方法是什么?谢谢
private func cancel0(localCancellationPromise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()
self.scheduled?.cancel()
self.scheduled = nil
self.task = nil
// Possible states at this time are:
// 1) Task is scheduled but has not yet executed.
// 2) Task is currently executing and invoked `cancel()` on itself.
// 3) Task is currently executing and `cancel0()` has been reentrantly invoked.
// 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution)
// 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2).
// 6) Task has completed execution in a failure state.
// 7) Task has been fully cancelled at a previous time.
//
// It is desirable that the task has fully completed any execution before any cancellation promise is
// fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring
// fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing
// them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the
// promises for nil so as not to otherwise invoke `execute()` unnecessarily.
if self.cancellationPromise != nil || localCancellationPromise != nil {
self.eventLoop.execute {
self.cancellationPromise?.succeed(())
localCancellationPromise?.succeed(())
}
}
}
是的,任务为零,所以取消不起作用。我将全局变量更改为 static
static var task: RepeatedTask? = nil
现在工作正常。
但我仍然不确定 Swift-NIO 中自动重新连接的最佳做法是什么。
在我的 Android 应用程序中,我使用 Netty 作为 TCP 客户端
private inner class ConnectServerThread : Thread() {
override fun run() {
super.run()
val workerGroup = NioEventLoopGroup()
try {
val bootstrap = Bootstrap()
bootstrap.group(workerGroup)
.channel(NioSocketChannel::class.java)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(object : ChannelInitializer<SocketChannel>() {
public override fun initChannel(ch: SocketChannel) {
ch.pipeline().addLast(
ReconnectHandler(bootstrap, channel),
StringEncoder(StandardCharsets.UTF_8),
StringDecoder(StandardCharsets.UTF_8),
MessageHandlerAdapter()
)
}
})
val channelFuture = bootstrap.connect(
InetSocketAddress(
ConfigUtil.config!!.ip,
ConfigUtil.config!!.tcpPort!!.toInt()
)
).sync()
channelFuture.addListener {
getConnectionListener()
}
channel = channelFuture.channel() as SocketChannel
} catch (e: Exception) {
Log.d("SystemService", e.toString())
}
}
}
我使用 ReconnectHandler 进行重新连接,使用 getConnectionListener 进行侦听。
Swift-NIO中是否有类似的Listener或者其他方案?
SwiftNIO 的解决方案将要求您的处理程序将回调附加到从 connect
返回的未来。这些回调可以关闭重复任务,因此可以在连接完成时取消它。例如:
import Foundation
import NIO
final class Reconnector {
private var task: RepeatedTask? = nil
private let bootstrap: ClientBootstrap
init(bootstrap: ClientBootstrap) {
self.bootstrap = bootstrap
}
func reconnect(on loop: EventLoop) {
self.task = loop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(10)) { task in
print("reconnecting")
try self._tryReconnect()
}
}
private func _tryReconnect() throws {
try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int).whenSuccess { _ in
print("reconnect successful!")
self.task?.cancel()
self.task = nil
}
}
}
class MessageHandler: ChannelInboundHandler {
let notificationMessage = NSNotification.Name(rawValue: "Location")
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer
private var numBytes = 0
private let reconnect: Reconnector
init(bootstrap: ClientBootstrap) {
self.reconnector = Reconnector(bootstrap: bootstrap)
}
func channelInactive(context: ChannelHandlerContext) {
self.reconnector.reconnect()
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = unwrapInboundIn(data)
let readableBytes = buffer.readableBytes
if let message = buffer.readString(length: readableBytes) {
print(message)
let dictMessage = ["Location": message]
NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
// As we are not really interested getting notified on success or failure we just pass nil as promise to
// reduce allocations.
context.close(promise: nil)
}
}
我在Swift-NIO中写了一个TCP Client来连接Netty TCP Server。 我希望 tcp 客户端可以在需要时自动重新连接。
import Foundation
import NIO
class MessageHandler: ChannelInboundHandler {
let notificationMessage = NSNotification.Name(rawValue: "Location")
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer
private var numBytes = 0
private var task: RepeatedTask? = nil
private var bootstrap: ClientBootstrap
init(bootstrap: ClientBootstrap) {
self.bootstrap = bootstrap
}
public func channelActive(context: ChannelHandlerContext) {
print("Reconnect Successful")
self.task?.cancel()
context.fireChannelActive()
}
func channelInactive(context: ChannelHandlerContext) {
self.task = context.channel.eventLoop.scheduleRepeatedTask(initialDelay: TimeAmount.seconds(0), delay: TimeAmount.seconds(10), { (RepeatedTask) in
print("Reconnecting...")
try { () -> EventLoopFuture<Channel> in
return try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int)
}()
})
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = unwrapInboundIn(data)
let readableBytes = buffer.readableBytes
if let message = buffer.readString(length: readableBytes) {
print(message)
let dictMessage = ["Location": message]
NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
// As we are not really interested getting notified on success or failure we just pass nil as promise to
// reduce allocations.
context.close(promise: nil)
}
}
关于作品,但有些地方不对。 我使用 eventLoop.scheduleRepeatedTask 每 10 秒检查一次,连接后取消 RepeatedTask.But self.task?.cancel() 不起作用,我查看了取消的源代码。 取消 RepeatedTask 的正确方法是什么?谢谢
private func cancel0(localCancellationPromise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()
self.scheduled?.cancel()
self.scheduled = nil
self.task = nil
// Possible states at this time are:
// 1) Task is scheduled but has not yet executed.
// 2) Task is currently executing and invoked `cancel()` on itself.
// 3) Task is currently executing and `cancel0()` has been reentrantly invoked.
// 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution)
// 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2).
// 6) Task has completed execution in a failure state.
// 7) Task has been fully cancelled at a previous time.
//
// It is desirable that the task has fully completed any execution before any cancellation promise is
// fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring
// fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing
// them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the
// promises for nil so as not to otherwise invoke `execute()` unnecessarily.
if self.cancellationPromise != nil || localCancellationPromise != nil {
self.eventLoop.execute {
self.cancellationPromise?.succeed(())
localCancellationPromise?.succeed(())
}
}
}
是的,任务为零,所以取消不起作用。我将全局变量更改为 static
static var task: RepeatedTask? = nil
现在工作正常。
但我仍然不确定 Swift-NIO 中自动重新连接的最佳做法是什么。 在我的 Android 应用程序中,我使用 Netty 作为 TCP 客户端
private inner class ConnectServerThread : Thread() {
override fun run() {
super.run()
val workerGroup = NioEventLoopGroup()
try {
val bootstrap = Bootstrap()
bootstrap.group(workerGroup)
.channel(NioSocketChannel::class.java)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(object : ChannelInitializer<SocketChannel>() {
public override fun initChannel(ch: SocketChannel) {
ch.pipeline().addLast(
ReconnectHandler(bootstrap, channel),
StringEncoder(StandardCharsets.UTF_8),
StringDecoder(StandardCharsets.UTF_8),
MessageHandlerAdapter()
)
}
})
val channelFuture = bootstrap.connect(
InetSocketAddress(
ConfigUtil.config!!.ip,
ConfigUtil.config!!.tcpPort!!.toInt()
)
).sync()
channelFuture.addListener {
getConnectionListener()
}
channel = channelFuture.channel() as SocketChannel
} catch (e: Exception) {
Log.d("SystemService", e.toString())
}
}
}
我使用 ReconnectHandler 进行重新连接,使用 getConnectionListener 进行侦听。 Swift-NIO中是否有类似的Listener或者其他方案?
SwiftNIO 的解决方案将要求您的处理程序将回调附加到从 connect
返回的未来。这些回调可以关闭重复任务,因此可以在连接完成时取消它。例如:
import Foundation
import NIO
final class Reconnector {
private var task: RepeatedTask? = nil
private let bootstrap: ClientBootstrap
init(bootstrap: ClientBootstrap) {
self.bootstrap = bootstrap
}
func reconnect(on loop: EventLoop) {
self.task = loop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(10)) { task in
print("reconnecting")
try self._tryReconnect()
}
}
private func _tryReconnect() throws {
try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int).whenSuccess { _ in
print("reconnect successful!")
self.task?.cancel()
self.task = nil
}
}
}
class MessageHandler: ChannelInboundHandler {
let notificationMessage = NSNotification.Name(rawValue: "Location")
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer
private var numBytes = 0
private let reconnect: Reconnector
init(bootstrap: ClientBootstrap) {
self.reconnector = Reconnector(bootstrap: bootstrap)
}
func channelInactive(context: ChannelHandlerContext) {
self.reconnector.reconnect()
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = unwrapInboundIn(data)
let readableBytes = buffer.readableBytes
if let message = buffer.readString(length: readableBytes) {
print(message)
let dictMessage = ["Location": message]
NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
// As we are not really interested getting notified on success or failure we just pass nil as promise to
// reduce allocations.
context.close(promise: nil)
}
}