如何在 SwiftNIO 中使用工作队列?

How to use worker queues in SwiftNIO?

我有一个 Swift NIO HTTP2 服务器,它在上下文的事件循环中处理请求。但是我想在另一个线程中处理请求,GCD aync线程池并取回结果并发送。

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
    context.eventLoop.execute {
       context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
            // ...
            var buffer = context.channel.allocator.buffer(capacity: respBody.count)
            buffer.writeString(respBody)        
            context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
            return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
        }.whenComplete { _ in
            context.close(promise: nil)
        }
    }
}

如果我将其更改为使用 GCD 全局队列,我将如何 return EventLoopFuture<Void> 响应?

context.eventLoop.execute {
    context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
        DispatchQueue.global().async {
            return self.send("hello world new ok", to: context.channel).whenComplete({ _ in
                _ = context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
                context.close(promise: nil)
            })
        }
    }
}

可以这样使用 GCD 全局队列吗?或者我将如何使用工作线程?


发送字符串函数调用下面的函数写入正文。

private func sendData(_ data: Data, to channel: Channel, context: StreamContext) -> EventLoopFuture<Void> {
    let headers = self.getHeaders(contentLength: data.count, context: context)
    _ = self.sendHeader(status: .ok, headers: headers, to: channel, context: context)
    var buffer = channel.allocator.buffer(capacity: data.count)
    buffer.writeBytes(data)
    let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
    return channel.writeAndFlush(part)
}

SwiftNIO 中的规则是:

  • Channel 上的操作是线程安全的,因此您可以从任何线程或队列执行它们
  • ChannelHandlerContext 的操作 不是 线程安全的,只能在 ChannelHandler 中完成。 ChannelHandler 的所有事件都在右侧 EventLoop.
  • 调用

所以你的例子几乎是正确的,只要确保只使用 Channel 而不是来自 DispatchQueue 或任何其他线程(不是频道的线程)的 ChannelHandlerContext EventLoop).

let channel = context.channel // save on the EventLoop
channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
    DispatchQueue.global().async {
        self.send("hello world new ok", to: channel).flatMap {
            channel.writeAndFlush(HTTPServerResponsePart.end(nil))
        }.whenComplete {
            channel.close(promise: nil)
        }
    }
}

我在这里做的一个假设是 self.send 可以从任何线程调用并且不使用您可能存储在 self 上的 ChannelHandlerContext .要评估 self.send 在这里是否合适,我需要知道它到底做了什么。


顺便说一句,在您的第一个代码片段中,您有多余的 eventloop.execute:

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
   // eventLoop.execute not necessary here
   context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
        // ...
        var buffer = context.channel.allocator.buffer(capacity: respBody.count)
        buffer.writeString(respBody)        
        context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
        return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
    }.whenComplete { _ in
        context.close(promise: nil)
    }
}

context.eventLoop.execute 是不必要的,因为 ChannelHandler 上的任何事件 总是 在正确的 EventLoop.

中调用