生产<Type> 与频道<Type>()
produce<Type> vs Channel<Type>()
正在尝试了解渠道。我想引导 android BluetoothLeScanner。为什么这样做:
fun startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> {
val channel = Channel<ScanResult>()
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
channel.offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
return channel
}
但不是这个:
fun startScan(scope: CoroutineScope, filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = scope.produce {
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
}
它告诉我 Channel was closed
什么时候它想第一次调用 offer
。
EDIT1: 根据文档:The channel is closed when the coroutine completes.
这是有道理的。我知道我们可以使用 suspendCoroutine
和 resume
一次性替换 callback
。然而,这是 listener/stream-situation。我不希望协程完成
使用 produce
,您可以为您的频道引入范围。这意味着,可以取消生成通过通道流式传输的项目的代码。
这也意味着您频道的生命周期从 produce
的 lambda 开始,到此 lambda 结束时结束。
在您的示例中,produce
调用的 lambda 几乎立即结束,这意味着您的频道几乎立即关闭。
将您的代码更改为如下内容:
fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
// now suspend this lambda forever (until its scope is canceled)
suspendCancellableCoroutine<Nothing> { cont ->
cont.invokeOnCancellation {
scanner.stopScan(...)
}
}
}
...
val channel = scope.startScan(filter)
...
...
scope.cancel() // cancels the channel and stops the scanner.
我添加了行 suspendCancellableCoroutine<Nothing> { ... }
使其暂停 'forever'。
更新:使用 produce
并以结构化方式处理错误(允许结构化并发):
fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
// Suspend this lambda forever (until its scope is canceled)
suspendCancellableCoroutine<Nothing> { cont ->
val scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
override fun onScanFailed(errorCode: Int) {
cont.resumeWithException(MyScanException(errorCode))
}
}
scanner.startScan(filters, settings, scanCallback)
cont.invokeOnCancellation {
scanner.stopScan(...)
}
}
}
正在尝试了解渠道。我想引导 android BluetoothLeScanner。为什么这样做:
fun startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> {
val channel = Channel<ScanResult>()
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
channel.offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
return channel
}
但不是这个:
fun startScan(scope: CoroutineScope, filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = scope.produce {
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
}
它告诉我 Channel was closed
什么时候它想第一次调用 offer
。
EDIT1: 根据文档:The channel is closed when the coroutine completes.
这是有道理的。我知道我们可以使用 suspendCoroutine
和 resume
一次性替换 callback
。然而,这是 listener/stream-situation。我不希望协程完成
使用 produce
,您可以为您的频道引入范围。这意味着,可以取消生成通过通道流式传输的项目的代码。
这也意味着您频道的生命周期从 produce
的 lambda 开始,到此 lambda 结束时结束。
在您的示例中,produce
调用的 lambda 几乎立即结束,这意味着您的频道几乎立即关闭。
将您的代码更改为如下内容:
fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
}
scanner.startScan(filters, settings, scanCallback)
// now suspend this lambda forever (until its scope is canceled)
suspendCancellableCoroutine<Nothing> { cont ->
cont.invokeOnCancellation {
scanner.stopScan(...)
}
}
}
...
val channel = scope.startScan(filter)
...
...
scope.cancel() // cancels the channel and stops the scanner.
我添加了行 suspendCancellableCoroutine<Nothing> { ... }
使其暂停 'forever'。
更新:使用 produce
并以结构化方式处理错误(允许结构化并发):
fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
// Suspend this lambda forever (until its scope is canceled)
suspendCancellableCoroutine<Nothing> { cont ->
val scanCallback = object : ScanCallback() {
override fun onScanResult(callbackType: Int, result: ScanResult) {
offer(result)
}
override fun onScanFailed(errorCode: Int) {
cont.resumeWithException(MyScanException(errorCode))
}
}
scanner.startScan(filters, settings, scanCallback)
cont.invokeOnCancellation {
scanner.stopScan(...)
}
}
}