Scala 中的异步 IO(套接字)
Asynchronous IO (socket) in Scala
import java.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import java.net.InetSocketAddress
import scala.concurrent.{Future, blocking}
class Master {
val server: AsynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()
server.bind(new InetSocketAddress("localhost", port))
val client: Future[AsynchronousSocketChannel] = Future { blocking { server.accept().get() } }
}
这是我正在尝试的伪代码。
在问这个问题之前,我搜索了一下,找到了一个相关的answer。
在她的回答中,我想知道这是什么意思:"If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library."
因为我不想遭受 running out of memory
(使用 blocking
时的情况)或 thread pool hell
(相反情况)的困扰,她的回答正是我一直在寻找的向前。但是,正如您在我的伪代码中看到的那样,由于 blocking
原因,将为每个客户端创建一个新线程(为了简单起见,我只创建了一个客户端),即使我像她说的那样使用了 NIO。
- 请用一个简单的例子来解释她的建议。
- 在 Scala 中尝试异步 io 时,我的伪代码是否是一种合适的方法,或者是否有更好的替代方法?
问题 1 的答案
她提出了两点建议
a) 如果您使用带有阻塞调用的 future,则使用 scala.concurrent.blocking
。 blocking
告诉默认执行上下文生成临时线程以停止饥饿。
假设 blockingServe()
确实阻塞了。要执行多个 blockingServe
,我们使用 Futures。
Future {
blockingServe() //blockingServe() could be serverSocket.accept()
}
但是上面的代码导致了事件模型中的饥饿。为了应对饥饿。我们必须要求执行上下文创建新的临时线程来处理额外的请求。这是使用 scala.concurrent.blocking
传达给执行上下文的
Future {
blocking {
blockingServe() //blockingServe() could be serverSocket.accept()
}
}
b)
我们还没有实现非阻塞。代码仍然阻塞但在不同的线程中阻塞(异步)
如何实现真正的非阻塞?
我们可以使用non-blocking api
实现真正的非阻塞。
因此,在您的情况下,您必须使用 java.nio.channels.ServerSocketChannel
才能获得真正的非阻塞模型。
请注意,在您的代码片段中,您混合了 a) 和 b),这不是必需的
问题 2 的答案
val selector = Selector.open()
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.socket().bind(new InetSocketAddress("192.168.2.1", 5000))
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
def assignWork[A](serverChannel: ServerSocketChannel, selector: Selector, work: => Future[A]) = {
work
//recurse
}
assignWork[Unit](serverChannel, selector, Future(()))
}
import java.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import java.net.InetSocketAddress
import scala.concurrent.{Future, blocking}
class Master {
val server: AsynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()
server.bind(new InetSocketAddress("localhost", port))
val client: Future[AsynchronousSocketChannel] = Future { blocking { server.accept().get() } }
}
这是我正在尝试的伪代码。
在问这个问题之前,我搜索了一下,找到了一个相关的answer。
在她的回答中,我想知道这是什么意思:"If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library."
因为我不想遭受 running out of memory
(使用 blocking
时的情况)或 thread pool hell
(相反情况)的困扰,她的回答正是我一直在寻找的向前。但是,正如您在我的伪代码中看到的那样,由于 blocking
原因,将为每个客户端创建一个新线程(为了简单起见,我只创建了一个客户端),即使我像她说的那样使用了 NIO。
- 请用一个简单的例子来解释她的建议。
- 在 Scala 中尝试异步 io 时,我的伪代码是否是一种合适的方法,或者是否有更好的替代方法?
问题 1 的答案
她提出了两点建议
a) 如果您使用带有阻塞调用的 future,则使用 scala.concurrent.blocking
。 blocking
告诉默认执行上下文生成临时线程以停止饥饿。
假设 blockingServe()
确实阻塞了。要执行多个 blockingServe
,我们使用 Futures。
Future {
blockingServe() //blockingServe() could be serverSocket.accept()
}
但是上面的代码导致了事件模型中的饥饿。为了应对饥饿。我们必须要求执行上下文创建新的临时线程来处理额外的请求。这是使用 scala.concurrent.blocking
Future {
blocking {
blockingServe() //blockingServe() could be serverSocket.accept()
}
}
b)
我们还没有实现非阻塞。代码仍然阻塞但在不同的线程中阻塞(异步)
如何实现真正的非阻塞?
我们可以使用non-blocking api
实现真正的非阻塞。
因此,在您的情况下,您必须使用 java.nio.channels.ServerSocketChannel
才能获得真正的非阻塞模型。
请注意,在您的代码片段中,您混合了 a) 和 b),这不是必需的
问题 2 的答案
val selector = Selector.open()
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.socket().bind(new InetSocketAddress("192.168.2.1", 5000))
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
def assignWork[A](serverChannel: ServerSocketChannel, selector: Selector, work: => Future[A]) = {
work
//recurse
}
assignWork[Unit](serverChannel, selector, Future(()))
}