Object.wait() 的非阻塞替代方案?
Non-blocking alternative to Object.wait()?
我有一个线程从本地服务器接收数据包:
// Shared object:
@Volatile lateinit var recentPacket: Packet
val recvMutex = Object()
// Thread code:
thread(isDaemon = true) {
while (true) {
val packet = readPacket()
synchronized(recvMutex) {
recentPacket = packet
recvMutex.notifyAll()
}
}
}
而且我有 多个 其他线程在等待数据包,每个线程都应该得到刚收到的相同数据包:
suspend fun receive() {
return synchronized(recvMutex) {
recvMutex.wait() // non-blocking alternative to this?
recentPacket
}
}
有效,但 Object.wait()
阻塞了线程。有没有办法避免这种情况?
协程似乎处于实验状态;我建议等到那些成熟后再使用它们。参见 https://kotlinlang.org/docs/reference/coroutines.html#experimental-status-of-coroutines
同时,您可以尝试 ThreadPool
:
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.util.concurrent.Executors
import kotlin.concurrent.thread
fun start() {
val pool = Executors.newFixedThreadPool(10)
thread(isDaemon = true) {
val socket = DatagramSocket(12345)
while (!socket.isClosed) {
val packet = DatagramPacket(ByteArray(1000), 0)
socket.receive(packet)
pool.submit({
receive(packet)
})
}
}
pool.shutdown()
}
fun receive(packet: DatagramPacket) {
println(String(packet.data, 0, packet.length))
}
异步IO可能会有用;你可以看看 Java Selectors
It works, but Object.wait()
blocks the thread. Is there a way to avoid this?
是的,但这意味着删除您现在使用的完整 wait-notify
习语,并将其替换为 Kotlin 的原生 BroadcastChannel
.
这是一个基本示例,有两个接收器和五个正在广播的数据包:
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.channels.BroadcastChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
private val threadPool = Executors.newCachedThreadPool() as ExecutorService
val MyPool = threadPool.asCoroutineDispatcher()
fun main(args: Array<String>) {
val packetChannel = BroadcastChannel<Packet>(1)
(1..2).forEach {
launch(MyPool) {
receivePackets(it, packetChannel.openSubscription())
}
}
runBlocking {
(1..5).forEach {
packetChannel.send(Packet(it))
delay(100)
}
}
threadPool.shutdown()
}
suspend fun receivePackets(index: Int, packetChannel: SubscriptionReceiveChannel<Packet>) {
while (true) {
println("Receiver $index got packet ${packetChannel.receive().value}")
}
}
data class Packet(
val value: Int
)
希望看到这样的输出:
Receiver 1 got packet 1
Receiver 2 got packet 1
Receiver 2 got packet 2
Receiver 1 got packet 2
Receiver 1 got packet 3
Receiver 2 got packet 3
Receiver 1 got packet 4
Receiver 2 got packet 4
Receiver 1 got packet 5
Receiver 2 got packet 5
我有一个线程从本地服务器接收数据包:
// Shared object:
@Volatile lateinit var recentPacket: Packet
val recvMutex = Object()
// Thread code:
thread(isDaemon = true) {
while (true) {
val packet = readPacket()
synchronized(recvMutex) {
recentPacket = packet
recvMutex.notifyAll()
}
}
}
而且我有 多个 其他线程在等待数据包,每个线程都应该得到刚收到的相同数据包:
suspend fun receive() {
return synchronized(recvMutex) {
recvMutex.wait() // non-blocking alternative to this?
recentPacket
}
}
有效,但 Object.wait()
阻塞了线程。有没有办法避免这种情况?
协程似乎处于实验状态;我建议等到那些成熟后再使用它们。参见 https://kotlinlang.org/docs/reference/coroutines.html#experimental-status-of-coroutines
同时,您可以尝试 ThreadPool
:
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.util.concurrent.Executors
import kotlin.concurrent.thread
fun start() {
val pool = Executors.newFixedThreadPool(10)
thread(isDaemon = true) {
val socket = DatagramSocket(12345)
while (!socket.isClosed) {
val packet = DatagramPacket(ByteArray(1000), 0)
socket.receive(packet)
pool.submit({
receive(packet)
})
}
}
pool.shutdown()
}
fun receive(packet: DatagramPacket) {
println(String(packet.data, 0, packet.length))
}
异步IO可能会有用;你可以看看 Java Selectors
It works, but
Object.wait()
blocks the thread. Is there a way to avoid this?
是的,但这意味着删除您现在使用的完整 wait-notify
习语,并将其替换为 Kotlin 的原生 BroadcastChannel
.
这是一个基本示例,有两个接收器和五个正在广播的数据包:
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.channels.BroadcastChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
private val threadPool = Executors.newCachedThreadPool() as ExecutorService
val MyPool = threadPool.asCoroutineDispatcher()
fun main(args: Array<String>) {
val packetChannel = BroadcastChannel<Packet>(1)
(1..2).forEach {
launch(MyPool) {
receivePackets(it, packetChannel.openSubscription())
}
}
runBlocking {
(1..5).forEach {
packetChannel.send(Packet(it))
delay(100)
}
}
threadPool.shutdown()
}
suspend fun receivePackets(index: Int, packetChannel: SubscriptionReceiveChannel<Packet>) {
while (true) {
println("Receiver $index got packet ${packetChannel.receive().value}")
}
}
data class Packet(
val value: Int
)
希望看到这样的输出:
Receiver 1 got packet 1
Receiver 2 got packet 1
Receiver 2 got packet 2
Receiver 1 got packet 2
Receiver 1 got packet 3
Receiver 2 got packet 3
Receiver 1 got packet 4
Receiver 2 got packet 4
Receiver 1 got packet 5
Receiver 2 got packet 5