将循环阻塞代码重写为 SwiftNIO 风格的非阻塞代码

Rewriting looping blocking code to SwiftNIO style non-blocking code

我正在研究将从网络读取数据的驱动程序。它不知道响应中有多少,除了当它尝试读取并返回 0 字节时,它就完成了。所以我的阻塞 Swift 代码看起来像这样天真:

func readAllBlocking() -> [Byte] {

  var buffer: [Byte] = []
  var fullBuffer: [Byte] = []

  repeat {
    buffer = read() // synchronous, blocking
    fullBuffer.append(buffer)
  } while buffer.count > 0

  return fullBuffer
}

如何将其重写为在读取整个结果之前将持续 运行 的承诺?在试图绕过我的大脑之后,我仍然被困在这里:

func readAllNonBlocking() -> EventLoopFuture<[Byte]> {

  ///...?
}

我应该补充一点,我可以将 read() 重写为 returning [Byte] return EventLoopF​​uture<[Byte]>

通常,同步编程中的循环会变成递归,以获得与使用 futures 的异步编程(以及函数式编程)相同的效果。

因此您的函数可能如下所示:

func readAllNonBlocking(on eventLoop: EventLoop) -> EventLoopFuture<[Byte]> {
    // The accumulated chunks
    var accumulatedChunks: [Byte] = []

    // The promise that will hold the overall result
    let promise = eventLoop.makePromise(of: [Byte].self)

    // We turn the loop into recursion:
    func loop() {
        // First, we call `read` to read in the next chunk and hop
        // over to `eventLoop` so we can safely write to `accumulatedChunks`
        // without a lock.
        read().hop(to: eventLoop).map { nextChunk in
            // Next, we just append the chunk to the accumulation
            accumulatedChunks.append(contentsOf: nextChunk)
            guard nextChunk.count > 0 else {
                promise.succeed(accumulatedChunks)
                return
            }
            // and if it wasn't empty, we loop again.
            loop()
        }.cascadeFailure(to: promise) // if anything goes wrong, we fail the whole thing.
    }

    loop() // Let's kick everything off.

    return promise.futureResult
}

不过我想补充两点:

首先,你在上面实现的是简单地读取所有内容,直到你看到 EOF,如果该软件暴露在互联网上,你应该明确地限制内存中最大可容纳的字节数.

其次,SwiftNIO 是一个事件驱动系统,因此如果您使用 SwiftNIO 读取这些字节,程序实际上看起来会略有不同。如果您对在 SwiftNIO 中简单地累加所有字节直到 EOF 是什么样子感兴趣,那就是:

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData
    }

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
            context.fireChannelRead(self.wrapInboundOut(buffer))
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        }
        buffer.clear()
        return .needMoreData
    }
}

如果你想用 SwiftNIO 制作一个完整的程序,这里有一个例子,它是一个服务器,它接受所有数据,直到它看到 EOF,然后写回接收到的字节数:)。当然,在现实世界中你永远不会保留所有接收到的字节来计算它们(你可以只添加每个单独的部分)但我想它可以作为一个例子。

import NIO

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
    try! group.syncShutdownGracefully()
}

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData
    }

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
            context.fireChannelRead(self.wrapInboundOut(buffer))
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        }
        buffer.clear()
        return .needMoreData
    }
}

// Just an example "business logic" handler. It will wait for one message
// and just write back the length.
final class SendBackLengthOfFirstInput: ChannelInboundHandler {
    typealias InboundIn = ByteBuffer
    typealias OutboundOut = ByteBuffer

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        // Once we receive the message, we allocate a response buffer and just write the length of the received
        // message in there. We then also close the channel.
        let allData = self.unwrapInboundIn(data)
        var response = context.channel.allocator.buffer(capacity: 10)
        response.writeString("\(allData.readableBytes)\n")
        context.writeAndFlush(self.wrapOutboundOut(response)).flatMap {
            context.close(mode: .output)
        }.whenSuccess {
            context.close(promise: nil)
        }
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("ERROR: \(error)")
        context.channel.close(promise: nil)
    }
}

let server = try ServerBootstrap(group: group)
    // Allow us to reuse the port after the process quits.
    .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
    // We should allow half-closure because we want to write back after having received an EOF on the input
    .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
    // Our program consists of two parts:
    .childChannelInitializer { channel in
        channel.pipeline.addHandlers([
            // 1: The accumulate everything until EOF handler
            ByteToMessageHandler(AccumulateUntilEOF(),
                                 // We want 1 MB of buffering max. If you remove this parameter, it'll also
                                 // buffer indefinitely.
                                 maximumBufferSize: 1024 * 1024),
            // 2: Our "business logic"
            SendBackLengthOfFirstInput()
        ])
    }
    // Let's bind port 9999
    .bind(to: SocketAddress(ipAddress: "127.0.0.1", port: 9999))
    .wait()

// This will never return.
try server.closeFuture.wait()

演示:

$ echo -n "hello world" | nc localhost 9999
11