从不暂停 IO 功能 return

suspend IO function never return

我在为 Android 编写 UDP 消息接收循环时遇到困难。

在下面的代码中,在 receiveLoop 中,对 receiveMessages 的调用从不 returns,因此我从不进入消息处理循环。 请注意,我仍然能够接收数据包,但当通道缓冲区已满时它会停止。

我希望 receiveMessages 立即变为 return,而其中的阻塞 IO 循环仍将永远 运行。

class MySocketUDP(private val params: SocketParams) {


    private val rcvSocket: DatagramSocket by lazy {
        val sock = DatagramSocket(params.rcvPort)
        sock.reuseAddress = true
        sock.soTimeout = 1000
        sock
    }

    suspend fun receiveMessages(channel: SendChannel<Message>) {
        withContext(Dispatchers.IO) {
            val buf = ByteArray(MAX_MSG_SIZE)
            while (true) {
                val pkt = DatagramPacket(buf, buf.size)
                try {
                    if (channel.isClosedForSend) {
                        break
                    }
                    rcvSocket.receive(pkt)
                    val msg = packetToMessage(buf, 0, pkt.length)
                    Log.d("SOCKET", "filling channel with $msg")
                    channel.send(msg)
                } catch (ex: SocketTimeoutException) {
                } catch (ex: CancellationException) {
                    break
                }
            }
        }
    }
}

class MyModel {

    private suspend fun receiveLoop(socket: MySocketUDP) {
        withContext(Dispatchers.Main) {
            val channel = Channel<Message>(16)
            socket.receiveMessages(channel)
            Log.d("MODEL", "Entering msg loop")
            for (msg in channel) {
                dispatchRcvMessage(msg)
            }
        }
    }

}

  1. 为什么 receiveMessages 在 运行 正在 IO 调度程序中并从 Main 调度程序调用时从不 return?
  2. 我真的需要为这样的 producer/consumer 工作生成一个线程吗?
  3. 您能展示如何以“协程友好”的方式很好地实现如此长的阻塞代码吗?

谢谢

  1. receiveMessages() 是一个挂起函数,它调用另一个挂起函数 withContext(),而后者又具有无限循环。因此调用 socket.receiveMessages(channel) 将在循环未完成时暂停代码执行。

  2. 您需要为消费者和生产者启动单独的协程,例如使用 launch 函数。

  3. 一些使用协程的例子:

     val someScope = CoroutineScope(Dispatchers.Main)
    
     private suspend fun receiveLoop(socket: MySocketUDP) = someScope.launch {
         val channel = Channel<Message>(16)
         socket.receiveMessages(channel)
         Log.d("MODEL", "Entering msg loop")
         for (msg in channel) {
             dispatchRcvMessage(msg)
         }
     }
    
     // In MySocketUDP
     suspend fun receiveMessages(channel: SendChannel<Message>) {
         someAnotherScope.launch { // or can use coroutineScope builder function
             val buf = ByteArray(MAX_MSG_SIZE)
             while (true) {
                 val pkt = DatagramPacket(buf, buf.size)
                 try {
                     if (channel.isClosedForSend) {
                         break
                     }
                     rcvSocket.receive(pkt)
                     val msg = packetToMessage(buf, 0, pkt.length)
                     Log.d("SOCKET", "filling channel with $msg")
                     channel.send(msg)
                 } catch (ex: SocketTimeoutException) {
                 } catch (ex: CancellationException) {
                     break
                 }
             }
         }
     }