RxSwift 通过 flatMaps 链传播一个值
RxSwift propagating a value through a chain of flatMaps
所以我有一些 RxSwift 代码,我想在其中执行一系列异步操作,所有这些操作都是使用可观察对象组成的。 flatMap
是执行此操作的方法,并且效果很好,但是它似乎无法将变量传递到我能弄清楚的链中。一些伪代码最好地说明了这一点
假设有 3 个函数
class Connection {
static func establish(address:String) -> Observable<Connection>
func sendData(data:String) -> Observable<Int> // num bytes written or something
func close() -> Observable<Void>
}
我想在链中调用它们,以便我们连接、发送,然后关闭。像这样
Connection.establish(host)
.flatMap{ connection in connection.sendData("foo") }
.flatMap{ numBytes in ????.close() }
.subscribeNext{ /* all done */ }
问题是 flatMap
没有将它的输入参数传递到链下,因此传递给 subscribeNext
的闭包无法访问 connection
对象,因此它不能调用关闭。
我可以像下面那样做一些可怕的黑客攻击,但我真的不想这样做!
var connection:Connection?
Connection.establish(host)
.flatMap{ c in
connection = c
return c.sendData("foo")
}
.flatMap{ numBytes in connection!.close() }
.subscribeNext{ /* all done */ }
在 Rx 的 C# 版本中,这是通过对 SelectMany
的重载来解决的,它采用第二个闭包,将 2 个值(通常合并到一个元组中)然后 事物沿着链传播。我把它写成 RxSwfit 的扩展,它的工作原理如下:
Connection.establish(host)
.flatMap(
{ connection in connection.sendData("foo") },
combine: { ([=13=], ) }) // tupleify
.flatMap{ (connection, numbytes) in connection.close() }
.subscribeNext{ /* all done */ }
这一切都很好,但我的主要问题是 - 是否有更好的方法来做到这一点,它目前内置于 RxSwift 中?
另外,写这个扩展方法并不简单也不容易。我基本上通过 copy/pasting MiniRxSwift 中的那个从头开始重新实现 FlatMap 并修改它。如果我们必须编写这个扩展,是否有更好的方法使用 RxSwift 构造来实现它?
您可以在 establish
函数中执行此操作。它可能看起来像这样:
static func establish(address:String) -> Observable<Connection> {
return Observable.create { observer in
//create connection
observer.onNext(connection)
observer.onCompleted()
return AnonymousDisposable { connection.close() }
}
}
当你的 observer
被处理时,它也会关闭你的连接。
我认为这是这里最好的选择之一。
但是,我们可以使用 combineLatest
或 flatMap
或其他函数通过链向下传递连接。但这会很乏味:)
回复猎户座爱德华兹
'm not sure this would work; In my example the disposable is never
actually disposed, we want to close the connection as soon as the
operation completes instead
嗯,我很确定 RxSwift 的人会这样做。
如果您得到了结果,无论是 completed
还是 error
,您都可以处置观察者。如果你想再做一次,那么你 re-subscribe
它。
您可以使用以下函数之一来处理它:
- 使用
take
家庭:take(1)
、takeUntil
...
- 完成后调用
dispose()
观察者。 (气馁)
- 使用
debug()
查看观察者何时被处置
- ...
有两种方法可以做你想做的事"using RxSwift constructs."
Connection.establish(host)
.flatMap { Observable.combineLatest(Observable.just([=10=]), [=10=].sendData("foo")) }
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
或者如果您不介意插入地图,您可以:
Connection.establish(host)
.flatMap { connection in
connection.sendData("foo").map { (connection, [=11=]) }
}
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
注意 combineLatest
和 map
从一开始就在图书馆里。
所以我有一些 RxSwift 代码,我想在其中执行一系列异步操作,所有这些操作都是使用可观察对象组成的。 flatMap
是执行此操作的方法,并且效果很好,但是它似乎无法将变量传递到我能弄清楚的链中。一些伪代码最好地说明了这一点
假设有 3 个函数
class Connection {
static func establish(address:String) -> Observable<Connection>
func sendData(data:String) -> Observable<Int> // num bytes written or something
func close() -> Observable<Void>
}
我想在链中调用它们,以便我们连接、发送,然后关闭。像这样
Connection.establish(host)
.flatMap{ connection in connection.sendData("foo") }
.flatMap{ numBytes in ????.close() }
.subscribeNext{ /* all done */ }
问题是 flatMap
没有将它的输入参数传递到链下,因此传递给 subscribeNext
的闭包无法访问 connection
对象,因此它不能调用关闭。
我可以像下面那样做一些可怕的黑客攻击,但我真的不想这样做!
var connection:Connection?
Connection.establish(host)
.flatMap{ c in
connection = c
return c.sendData("foo")
}
.flatMap{ numBytes in connection!.close() }
.subscribeNext{ /* all done */ }
在 Rx 的 C# 版本中,这是通过对 SelectMany
的重载来解决的,它采用第二个闭包,将 2 个值(通常合并到一个元组中)然后 事物沿着链传播。我把它写成 RxSwfit 的扩展,它的工作原理如下:
Connection.establish(host)
.flatMap(
{ connection in connection.sendData("foo") },
combine: { ([=13=], ) }) // tupleify
.flatMap{ (connection, numbytes) in connection.close() }
.subscribeNext{ /* all done */ }
这一切都很好,但我的主要问题是 - 是否有更好的方法来做到这一点,它目前内置于 RxSwift 中?
另外,写这个扩展方法并不简单也不容易。我基本上通过 copy/pasting MiniRxSwift 中的那个从头开始重新实现 FlatMap 并修改它。如果我们必须编写这个扩展,是否有更好的方法使用 RxSwift 构造来实现它?
您可以在 establish
函数中执行此操作。它可能看起来像这样:
static func establish(address:String) -> Observable<Connection> {
return Observable.create { observer in
//create connection
observer.onNext(connection)
observer.onCompleted()
return AnonymousDisposable { connection.close() }
}
}
当你的 observer
被处理时,它也会关闭你的连接。
我认为这是这里最好的选择之一。
但是,我们可以使用 combineLatest
或 flatMap
或其他函数通过链向下传递连接。但这会很乏味:)
回复猎户座爱德华兹
'm not sure this would work; In my example the disposable is never actually disposed, we want to close the connection as soon as the operation completes instead
嗯,我很确定 RxSwift 的人会这样做。
如果您得到了结果,无论是 completed
还是 error
,您都可以处置观察者。如果你想再做一次,那么你 re-subscribe
它。
您可以使用以下函数之一来处理它:
- 使用
take
家庭:take(1)
、takeUntil
... - 完成后调用
dispose()
观察者。 (气馁) - 使用
debug()
查看观察者何时被处置 - ...
有两种方法可以做你想做的事"using RxSwift constructs."
Connection.establish(host)
.flatMap { Observable.combineLatest(Observable.just([=10=]), [=10=].sendData("foo")) }
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
或者如果您不介意插入地图,您可以:
Connection.establish(host)
.flatMap { connection in
connection.sendData("foo").map { (connection, [=11=]) }
}
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
注意 combineLatest
和 map
从一开始就在图书馆里。