您如何正确使用 NSStreams(无阻塞、读到数据末尾和消息重试)?

How do you correctly work with NSStreams (no blocking, read to the end of the data, and message retrying)?

我正在尝试与 NSInputStreamNSOutputStream 一起工作,但它造成了很多痛苦。

我有两个通信设备 Json。有些数据可能很长,因此 NSOutputStreamsends 将其分成多个数据包。

我需要接收不要阻塞在主线程上并且能够在尝试解析它之前读取所有需要的 json 数据包。然后继续读取剩下的json个数据包。

我需要发送不阻塞在主线程上,并且如果第一批发送失败也能完成发送数据。然后继续发送剩余的 json 数据。

我正在使用 swift,但也可以使用 objective c。

这是目前为止的代码。我的基本流助手 class:

public class StreamHelper : NSObject, NSStreamDelegate {
    static let DATA_BYTE_LENGTH = 4;

    public static func writeToOutputStream(text: String!, outputStream:NSOutputStream!) -> Int!{
        let encodedDataArray = [UInt8](text.utf8)

        var count: Int = encodedDataArray.count.littleEndian
        //convert int to pointer, which is required for the write method.
        withUnsafePointer(&count) { (pointer: UnsafePointer<Int>) -> Void in
            outputStream.write(UnsafePointer<UInt8>(pointer), maxLength: DATA_BYTE_LENGTH)
        }
        let bytesWritten = outputStream.write(encodedDataArray, maxLength: encodedDataArray.count)
        return bytesWritten;
    }

    public static func readFromInputStream(inputStream: NSInputStream!) -> String!{
        var buffer = [UInt8](count: 4096, repeatedValue: 0)
        var text = ""

        while (inputStream.hasBytesAvailable){
            let len = inputStream!.read(&buffer, maxLength: buffer.count)
            if(len > 0){
                if let output = NSString(bytes: &buffer, length: buffer.count, encoding: NSUTF8StringEncoding) as? String{
                    if (!output.isEmpty){
                        text += output
                    }
                }
            }
        }
        return text
    }
}

核心代码:

public func stream(aStream: NSStream, handleEvent eventCode: NSStreamEvent) {
    print("Reading from stream... ")
    switch (eventCode){
        case NSStreamEvent.ErrorOccurred:
            print("ErrorOccurred")
            break
        case NSStreamEvent.None:
            print("None")
            break
        case NSStreamEvent.EndEncountered:
            print("EndEncountered")
            if((aStream == inputStream) && inputStream!.hasBytesAvailable){
                // If all data hasn't been read, fall through to the "has bytes" event
            } else{
                break
            }
        case NSStreamEvent.HasBytesAvailable:
            print("HasBytesAvaible")
            let methodJson = StreamHelper.readFromInputStream(inputStream!)
            if(!methodJson.isEmpty){
                let cMethodJson = methodJson.cStringUsingEncoding(NSUTF8StringEncoding)!
                let returnedJsonString = String.fromCString(callMethod(cMethodJson))
                StreamHelper.writeToOutputStream(returnedJsonString, outputStream: outputStream!)
            }
            break
        case NSStreamEvent.OpenCompleted:
            print("OpenCompleted")
            break
        case NSStreamEvent.HasSpaceAvailable:
            print("HasSpaceAvailable")

            if(aStream == outputStream){
            }
            break
        default:
            break
    }
}

一些设置代码:

func connectToService(service: NSNetService!){

service.getInputStream(&inputStream, outputStream: &outputStream)

inputStream!.delegate = self
outputStream!.delegate = self

inputStream!.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
outputStream!.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
inputStream!.open()
outputStream!.open()
}

如何正确使用 NSStreams 或者是否有比使用 NSStreams 更好的解决方案?

你可能在这里工作太辛苦了。 NSStreamDelegate 是在 GCD 之前设计的,当时大部分 Cocoa 工作是在单个线程上完成的。虽然在某些情况下仍然有使用它的理由,但在大多数情况下,GCD 和同步方法会使它变得容易得多。例如,要阅读你会做这样的事情:

import Foundation

enum StreamError: ErrorType {
    case Error(error: NSError?, partialData: [UInt8])
}

func readStream(inputStream: NSInputStream) throws -> [UInt8] {
    let bufferSize = 1024
    var buffer = [UInt8](count: bufferSize, repeatedValue: 0)
    var data: [UInt8] = []

    while true {
        let count = inputStream.read(&buffer, maxLength: buffer.capacity)

        guard count >= 0 else {
            inputStream.close()
            throw StreamError.Error(error: inputStream.streamError, partialData: data)
        }

        guard count != 0 else {
            inputStream.close()
            return data
        }

        data.appendContentsOf(buffer.prefix(count))
    }
}

let textPath = NSBundle.mainBundle().pathForResource("text.txt", ofType: nil)!
let inputStream = NSInputStream(fileAtPath: textPath)!
inputStream.open()
do {
    let data = try readStream(inputStream)
    print(data)
} catch let err {
    print("ERROR: \(err)")
}

当然,这会阻塞当前队列。所以不要 运行 它在主队列中。将 do 块放入 dispatch_async。如果您稍后需要主队列中的数据,dispatch_async 将其返回,就像任何其他后台进程一样。

我不使用 NSStreams,而是直接通过套接字发送。
有包装器可以使这项任务更容易,最著名的是 GCDAsyncSocket。

根据其他答案,我想出了这个,它适用于 Swift 4.2.

public enum StreamError: Error {
    case Error(error: Error?, partialData: [UInt8])
}

extension InputStream {

    public func readData(bufferSize: Int = 1024) throws -> Data {
        var buffer = [UInt8](repeating: 0, count: bufferSize)
        var data: [UInt8] = []

        open()

        while true {
            let count = read(&buffer, maxLength: buffer.capacity)

            guard count >= 0 else {
                close()
                throw StreamError.Error(error: streamError, partialData: data)
            }

            guard count != 0 else {
                close()
                return Data(bytes: data)
            }

            data.append(contentsOf: (buffer.prefix(count)))
        }

    }
}