RxSwift 通过 flatMaps 链传播一个值

RxSwift propagating a value through a chain of flatMaps

所以我有一些 RxSwift 代码,我想在其中执行一系列异步操作,所有这些操作都是使用可观察对象组成的。 flatMap 是执行此操作的方法,并且效果很好,但是它似乎无法将变量传递到我能弄清楚的链中。一些伪代码最好地说明了这一点

假设有 3 个函数

class Connection {
    static func establish(address:String) -> Observable<Connection>
    func sendData(data:String) -> Observable<Int> // num bytes written or something
    func close() -> Observable<Void>
}

我想在链中调用它们,以便我们连接、发送,然后关闭。像这样

Connection.establish(host)
    .flatMap{ connection in connection.sendData("foo") }
    .flatMap{ numBytes in ????.close() }
    .subscribeNext{ /* all done */ }

问题是 flatMap 没有将它的输入参数传递到链下,因此传递给 subscribeNext 的闭包无法访问 connection 对象,因此它不能调用关闭。

我可以像下面那样做一些可怕的黑客攻击,但我真的不想这样做!

var connection:Connection?
Connection.establish(host)
    .flatMap{ c in 
        connection = c
        return c.sendData("foo") 
    }
    .flatMap{ numBytes in connection!.close() }
    .subscribeNext{ /* all done */ }

在 Rx 的 C# 版本中,这是通过对 SelectMany 的重载来解决的,它采用第二个闭包,将 2 个值(通常合并到一个元组中)然后 事物沿着链传播。我把它写成 RxSwfit 的扩展,它的工作原理如下:

Connection.establish(host)
    .flatMap(
        { connection in connection.sendData("foo") },
        combine: { ([=13=], ) }) // tupleify
    .flatMap{ (connection, numbytes) in connection.close() }
    .subscribeNext{ /* all done */ }

这一切都很好,但我的主要问题是 - 是否有更好的方法来做到这一点,它目前内置于 RxSwift 中?

另外,写这个扩展方法并不简单也不容易。我基本上通过 copy/pasting MiniRxSwift 中的那个从头开始重新实现 FlatMap 并修改它。如果我们必须编写这个扩展,是否有更好的方法使用 RxSwift 构造来实现它?

您可以在 establish 函数中执行此操作。它可能看起来像这样:

static func establish(address:String) -> Observable<Connection> {
    return Observable.create { observer in
        //create connection
        observer.onNext(connection)
        observer.onCompleted()

        return AnonymousDisposable { connection.close() }
    }
}

当你的 observer 被处理时,它也会关闭你的连接。
我认为这是这里最好的选择之一。

但是,我们可以使用 combineLatestflatMap 或其他函数通过链向下传递连接。但这会很乏味:)


回复猎户座爱德华兹

'm not sure this would work; In my example the disposable is never actually disposed, we want to close the connection as soon as the operation completes instead

嗯,我很确定 RxSwift 的人会这样做。
如果您得到了结果,无论是 completed 还是 error,您都可以处置观察者。如果你想再做一次,那么你 re-subscribe 它。

您可以使用以下函数之一来处理它:

  • 使用 take 家庭:take(1)takeUntil...
  • 完成后调用 dispose() 观察者。 (气馁)
  • 使用debug()查看观察者何时被处置
  • ...

有两种方法可以做你想做的事"using RxSwift constructs."

Connection.establish(host)
    .flatMap { Observable.combineLatest(Observable.just([=10=]), [=10=].sendData("foo")) }
    .flatMap { connection, _ in connection.close() }
    .subscribe(onNext: { /* all done */ })

或者如果您不介意插入地图,您可以:

Connection.establish(host)
    .flatMap { connection in
        connection.sendData("foo").map { (connection, [=11=]) }
    }
    .flatMap { connection, _ in connection.close() }
    .subscribe(onNext: { /* all done */ })

注意 combineLatestmap 从一开始就在图书馆里。