Java Akka使用router有并发限制?
Java Akka concurrent limitation in using router?
密码是:
public class TestAkka {
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("ExampleRouter", ConfigFactory.load().getConfig("MyRouter"));
ActorRef router = system.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "exampleRouter");
for (int i = 0; i < 100; i++) {
router.tell(new Website().getNameByIndex(i), router);
}
}
public static class Hello extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Hello " + message);
URL url = new URL("http://" + message + ":80");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
System.out.println(conn.getResponseCode());
Thread.sleep(10000); // <-- Sim the job take a short time
} else {
unhandled(message);
}
}
}
}
application.conf是:
MyRouter{
akka {
actor {
deployment {
/exampleRouter {
router = round-robin-pool
nr-of-instances = 100
}
}
}
}
}
结果是我只能看到 8 个并发作业每次 运行ning,但我的期望是 100 个并发作业应该同时 运行!还需要什么设置吗?
于 2016 年 6 月 6 日更新:
我已经修改了我的代码,结果是我期望覆盖 application.conf,它现在可以同时 运行 100 个并发作业。事实上,如何为高并发应用优化default-dispatcher?
String s = ""
+ "akka {\n"
+ " actor {\n"
+ " deployment {\n"
+ " /router {\n"
+ " router = round-robin-pool\n"
+ " nr-of-instances = 10000\n"
+ " }\n"
+ " }\n"
+ " default-dispatcher {\n"
+ " fork-join-executor {\n"
+ " parallelism-min = 200\n"
+ " parallelism-max = 5000\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
ActorSystem as = ActorSystem.create("as", ConfigFactory.parseString(s));
ActorRef ar = as.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "router");
您对 Thread.sleep
的调用阻塞了线程,因此您 运行 正在退出线程。如果您想查看全部 100 个,您应该 运行 在它们自己的专用线程上进行阻塞操作。
您对参与者和调度程序线程的数量感到困惑:
路由器中的参与者数量:它是在内存中创建的实例数量,这些实例将按照选定的逻辑处理到达路由器的消息。
Dispatcher线程:Dispatcher是一个线程池(或执行器服务),负责管理线程从actor邮箱获取消息并执行receive
方法。
您的系统中发生的并发任务的最大数量将受到调度程序配置的限制。拥有路由器(因此有更多的参与者来处理消息)将使这些消息由调度程序中的线程并发处理。
我建议阅读更多有关 akka 调度程序的信息,尤其是默认调度程序:http://doc.akka.io/docs/akka/current/scala/dispatchers.html
密码是:
public class TestAkka {
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("ExampleRouter", ConfigFactory.load().getConfig("MyRouter"));
ActorRef router = system.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "exampleRouter");
for (int i = 0; i < 100; i++) {
router.tell(new Website().getNameByIndex(i), router);
}
}
public static class Hello extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Hello " + message);
URL url = new URL("http://" + message + ":80");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
System.out.println(conn.getResponseCode());
Thread.sleep(10000); // <-- Sim the job take a short time
} else {
unhandled(message);
}
}
}
}
application.conf是:
MyRouter{
akka {
actor {
deployment {
/exampleRouter {
router = round-robin-pool
nr-of-instances = 100
}
}
}
}
}
结果是我只能看到 8 个并发作业每次 运行ning,但我的期望是 100 个并发作业应该同时 运行!还需要什么设置吗?
于 2016 年 6 月 6 日更新: 我已经修改了我的代码,结果是我期望覆盖 application.conf,它现在可以同时 运行 100 个并发作业。事实上,如何为高并发应用优化default-dispatcher?
String s = ""
+ "akka {\n"
+ " actor {\n"
+ " deployment {\n"
+ " /router {\n"
+ " router = round-robin-pool\n"
+ " nr-of-instances = 10000\n"
+ " }\n"
+ " }\n"
+ " default-dispatcher {\n"
+ " fork-join-executor {\n"
+ " parallelism-min = 200\n"
+ " parallelism-max = 5000\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
ActorSystem as = ActorSystem.create("as", ConfigFactory.parseString(s));
ActorRef ar = as.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "router");
您对 Thread.sleep
的调用阻塞了线程,因此您 运行 正在退出线程。如果您想查看全部 100 个,您应该 运行 在它们自己的专用线程上进行阻塞操作。
您对参与者和调度程序线程的数量感到困惑:
路由器中的参与者数量:它是在内存中创建的实例数量,这些实例将按照选定的逻辑处理到达路由器的消息。
Dispatcher线程:Dispatcher是一个线程池(或执行器服务),负责管理线程从actor邮箱获取消息并执行
receive
方法。
您的系统中发生的并发任务的最大数量将受到调度程序配置的限制。拥有路由器(因此有更多的参与者来处理消息)将使这些消息由调度程序中的线程并发处理。
我建议阅读更多有关 akka 调度程序的信息,尤其是默认调度程序:http://doc.akka.io/docs/akka/current/scala/dispatchers.html