Android Kotlin Coroutine Channel 消息未在 websocket 回调中发送

Android Kotlin Coroutine Channel message not sended in websocket callback

我正在使用 WebSocket (OkHttp) 开发聊天 android 应用程序

为此,我实现了 okhttp3.WebSocketListener 接口。 我正在从 onMessage 回调方法接收聊天消息。

我已经使用 Rx-PublishSubject 开发了它,并且工作正常。 但是我想改成Coroutine-Channel。

为此,我在 WebSocketListener class 中添加了频道。

@Singleton
class MyWebSocketService @Inject constructor(
    private val ioDispatcher: CoroutineDispatcher
): WebSocketListener() {

  // previous
  val messageSubject: PublishSubject<WsMsg> = PublishSubject.create()

  // new
  val messageChannel: Channel<WsMsg> by lazy { Channel() }

  override fun onMessage(webSocket: WebSocket, text: String) {
    super.onMessage(webSocket, text)

    // previous
    messageSubject.onNext(text)

    // new
    runBlocking(ioDispatcher) {
      Log.d(TAG, "message: $text")
      messageChannel.send(text)
    }
  }
}

但是...协程通道不起作用... 它只接收并打印一次日志。 但是它不会在第二条消息之后打印日志。

但是当我像下面这样更改代码时,它起作用了!

  override fun onMessage(webSocket: WebSocket, text: String) {
    super.onMessage(webSocket, text)

    GlobalScope.launch(ioDispatcher) {
      Log.d(TAG, "message: $text")
      messageChannel.send(text)
    }
  }

区别是 runBlockingGlobalScope。 我认为 GlobakScope 可能无法确保消息的顺序。 所以它不适合聊天应用程序。

我该如何解决这个问题?

默认 Channel() 没有缓冲区,这会导致 send(message) 暂停,直到通道的消费者调用 channel.receive()(这是在 for(element in channel){} 循环中隐式完成的)

由于您使用的是runBlocking,挂起实际上意味着阻塞当前线程。看起来 okhttp 将始终在同一个线程上传递消息,但它不能这样做,因为你仍在阻塞该线程。

正确的解决方案是为您的频道添加一个缓冲区。如果消息涌入的速度不太可能比您处理它们的速度快,您只需将 Channel() 替换为 Channel(Channel.UNLIMITED)