运行 发射器响应后按顺序执行多个任务
Run multiple tasks sequentially after emitter response
我正在尝试为总是有一些延迟响应的硬件设备创建一个通信控制器。如果我只请求一个值,我可以创建一个 Single<ByteArray>
并在 .subscribe{ ...}
中进行最终转换。
但是当我请求多个值时,我需要确保第二个请求在第一个请求完全关闭之后发生。
这是我可以用 RxJava 做的事情吗,例如defer?或者我应该自己创建一个队列并使用我的队列手动处理事件序列?
无论如何我们都在使用 RxJava(我显然是新手),当然,如果将它用于此目的也很好。但这是一个好的用例吗?
编辑:
我可以使用但不够通用的代码:
hardware.write(byteArray)
.subscribe(
{
hardware.receiveResult().take(1)
.doFinally { /* dispose code */ }
.subscribe(
{ /* onSuccess */ }
{ /* onError */ }
.let { disposable = it }
},
{ /* onError */ }
)
队列中下一个请求的所有代码都可以放在内部 onSuccess
中,然后是 onSuccess
中的下一个。这将按顺序执行,但不够通用。提出请求的任何其他 class 最终都会破坏我的序列。
我正在寻找一种在硬件通信控制器中自动建立队列的解决方案class。
好久不见,项目开发出来了,早就解决了。现在我想在这里分享它:
fun writeSequential(data1: ByteArray, data2: ByteArray) {
disposable = hardwareWrite(data1)
.zipWith(hardwareWrite(data2))
.subscribe(
{
/* handle results.
it.first will be the first response,
it.second the second. */
},
{ /* handle error */ }
)
compositeDisposable.add(disposable)
}
fun hardwareWrite(data: ByteArray): Disposable {
var emitter: SingleEmitter<ByteArray>? = null
var single = Single.create<ByteArray> { emitter = it }
return hardware.write(data)
.subscribe(
{ hardwareRead(emitter) },
{ /* onError */ }
))
}
fun hardwareRead(emitter: SingleEmitter<ByteArray>): Disposable {
return hardware.receiveResult()
.take(1)
.timeout( /* your timeout */ )
.single( /* default value */ )
.doFinally( /* cleanup queue */ )
.subscribe(
{ emitter.onSuccess(it) }
{ emitter.onError(it) }
)
}
解决方案并不完美,现在我看到中间部分对一次性结果没有任何作用。
同样在示例中,它有点复杂,因为 hardwareWrite 不会立即触发,而是会排队。这样我们可以确保硬件是按顺序访问的,结果不会混淆。
我仍然希望这可能对正在寻找解决方案并且可能是 kotlin 新手的人有所帮助 and/or RxJava 的东西(就像我在项目开始时一样)。
我正在尝试为总是有一些延迟响应的硬件设备创建一个通信控制器。如果我只请求一个值,我可以创建一个 Single<ByteArray>
并在 .subscribe{ ...}
中进行最终转换。
但是当我请求多个值时,我需要确保第二个请求在第一个请求完全关闭之后发生。
这是我可以用 RxJava 做的事情吗,例如defer?或者我应该自己创建一个队列并使用我的队列手动处理事件序列?
无论如何我们都在使用 RxJava(我显然是新手),当然,如果将它用于此目的也很好。但这是一个好的用例吗?
编辑:
我可以使用但不够通用的代码:
hardware.write(byteArray)
.subscribe(
{
hardware.receiveResult().take(1)
.doFinally { /* dispose code */ }
.subscribe(
{ /* onSuccess */ }
{ /* onError */ }
.let { disposable = it }
},
{ /* onError */ }
)
队列中下一个请求的所有代码都可以放在内部 onSuccess
中,然后是 onSuccess
中的下一个。这将按顺序执行,但不够通用。提出请求的任何其他 class 最终都会破坏我的序列。
我正在寻找一种在硬件通信控制器中自动建立队列的解决方案class。
好久不见,项目开发出来了,早就解决了。现在我想在这里分享它:
fun writeSequential(data1: ByteArray, data2: ByteArray) {
disposable = hardwareWrite(data1)
.zipWith(hardwareWrite(data2))
.subscribe(
{
/* handle results.
it.first will be the first response,
it.second the second. */
},
{ /* handle error */ }
)
compositeDisposable.add(disposable)
}
fun hardwareWrite(data: ByteArray): Disposable {
var emitter: SingleEmitter<ByteArray>? = null
var single = Single.create<ByteArray> { emitter = it }
return hardware.write(data)
.subscribe(
{ hardwareRead(emitter) },
{ /* onError */ }
))
}
fun hardwareRead(emitter: SingleEmitter<ByteArray>): Disposable {
return hardware.receiveResult()
.take(1)
.timeout( /* your timeout */ )
.single( /* default value */ )
.doFinally( /* cleanup queue */ )
.subscribe(
{ emitter.onSuccess(it) }
{ emitter.onError(it) }
)
}
解决方案并不完美,现在我看到中间部分对一次性结果没有任何作用。
同样在示例中,它有点复杂,因为 hardwareWrite 不会立即触发,而是会排队。这样我们可以确保硬件是按顺序访问的,结果不会混淆。
我仍然希望这可能对正在寻找解决方案并且可能是 kotlin 新手的人有所帮助 and/or RxJava 的东西(就像我在项目开始时一样)。