Object.wait() 的非阻塞替代方案?

Non-blocking alternative to Object.wait()?

我有一个线程从本地服务器接收数据包:

// Shared object:
@Volatile lateinit var recentPacket: Packet
val recvMutex = Object()

// Thread code:
thread(isDaemon = true) {
    while (true) {
        val packet = readPacket()
        synchronized(recvMutex) {
            recentPacket = packet
            recvMutex.notifyAll()
        }
    }
}

而且我有 多个 其他线程在等待数据包,每个线程都应该得到刚收到的相同数据包:

suspend fun receive() {
    return synchronized(recvMutex) {
        recvMutex.wait() // non-blocking alternative to this?
        recentPacket
    }
}

有效,但 Object.wait() 阻塞了线程。有没有办法避免这种情况?

协程似乎处于实验状态;我建议等到那些成熟后再使用它们。参见 https://kotlinlang.org/docs/reference/coroutines.html#experimental-status-of-coroutines

同时,您可以尝试 ThreadPool:

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.util.concurrent.Executors
import kotlin.concurrent.thread

fun start() {
    val pool = Executors.newFixedThreadPool(10)
    thread(isDaemon = true) {
        val socket = DatagramSocket(12345)
        while (!socket.isClosed) {
            val packet = DatagramPacket(ByteArray(1000), 0)
            socket.receive(packet)
            pool.submit({
                receive(packet)
            })
        }
    }
    pool.shutdown()
}

fun receive(packet: DatagramPacket) {
    println(String(packet.data, 0, packet.length))
}

异步IO可能会有用;你可以看看 Java Selectors

It works, but Object.wait() blocks the thread. Is there a way to avoid this?

是的,但这意味着删除您现在使用的完整 wait-notify 习语,并将其替换为 Kotlin 的原生 BroadcastChannel.

这是一个基本示例,有两个接收器和五个正在广播的数据包:

import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.channels.BroadcastChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

private val threadPool = Executors.newCachedThreadPool() as ExecutorService
val MyPool = threadPool.asCoroutineDispatcher()

fun main(args: Array<String>) {
    val packetChannel = BroadcastChannel<Packet>(1)
    (1..2).forEach {
        launch(MyPool) {
            receivePackets(it, packetChannel.openSubscription())
        }
    }
    runBlocking {
        (1..5).forEach {
            packetChannel.send(Packet(it))
            delay(100)
        }
    }
    threadPool.shutdown()
}

suspend fun receivePackets(index: Int, packetChannel: SubscriptionReceiveChannel<Packet>) {
    while (true) {
        println("Receiver $index got packet ${packetChannel.receive().value}")
    }
}

data class Packet(
        val value: Int
)

希望看到这样的输出:

Receiver 1 got packet 1
Receiver 2 got packet 1
Receiver 2 got packet 2
Receiver 1 got packet 2
Receiver 1 got packet 3
Receiver 2 got packet 3
Receiver 1 got packet 4
Receiver 2 got packet 4
Receiver 1 got packet 5
Receiver 2 got packet 5