Spring 集成动态流线程问题
Spring Integration Dynamic Flows threads issue
我正在根据数据库中的数据创建动态集成流。
我们需要用文件名模式轮询的目录在数据库中
例如
Instance, directory , filename
ABC , c:/input1 , test.txt
DEF , d:/input2 , fresh.xlsx
我有大约 200-300 个条目,所以我正在为每条记录创建集成流程,因为它会有不同的处理器等
每条记录
IntegrationFlowBuilder flowBuilder =
IntegrationFlows
.from(new CustomFileReadingSource(input), consumer);
flowBuilder.transform(new ObjectToJsonTransformer());
flowBuilder.handle(o -> {
// System.out.println(o.getPayload());
});
context.registration(flowBuilder.get()).register();
所有这些都注册后,但是当我查看 VisualVM 或日志时,我只看到 8-10 个线程,而不是 100 或 200。
来自日志
2018-11-13 16:00:41.399 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.587 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.071 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.323 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.569 [task-scheduler-6] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.878 [task-scheduler-8] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.197 [task-scheduler-9] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.588 [task-scheduler-1] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.951 [task-scheduler-2] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.305 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.598 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.414 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.974 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
如您所见,只有几个线程轮询线程
有人可以帮助解释为什么它不创建线程或实现并行轮询器的更好方法吗?
默认调度程序只有 10 个线程。通常,定时任务很短运行,在这种情况下,这通常就足够了;如果不是,则增加线程数或添加任务执行器,以便调度程序将工作交给另一个线程。
没错。因为所有 Polling Ednpoints
都基于预定义的全局 ThreadPoolTaskScheduler
,默认情况下 10
作为池大小:https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler
另一方面,在您的 CPU 最多 16 个内核的同时尝试拥有 100 个线程真的毫无意义。创建更多线程甚至可能会导致应用程序变慢。
我正在根据数据库中的数据创建动态集成流。
我们需要用文件名模式轮询的目录在数据库中
例如
Instance, directory , filename
ABC , c:/input1 , test.txt
DEF , d:/input2 , fresh.xlsx
我有大约 200-300 个条目,所以我正在为每条记录创建集成流程,因为它会有不同的处理器等
每条记录
IntegrationFlowBuilder flowBuilder =
IntegrationFlows
.from(new CustomFileReadingSource(input), consumer);
flowBuilder.transform(new ObjectToJsonTransformer());
flowBuilder.handle(o -> {
// System.out.println(o.getPayload());
});
context.registration(flowBuilder.get()).register();
所有这些都注册后,但是当我查看 VisualVM 或日志时,我只看到 8-10 个线程,而不是 100 或 200。
来自日志
2018-11-13 16:00:41.399 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.587 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.071 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.323 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.569 [task-scheduler-6] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.878 [task-scheduler-8] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.197 [task-scheduler-9] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.588 [task-scheduler-1] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.951 [task-scheduler-2] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.305 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.598 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.414 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.974 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
如您所见,只有几个线程轮询线程
有人可以帮助解释为什么它不创建线程或实现并行轮询器的更好方法吗?
默认调度程序只有 10 个线程。通常,定时任务很短运行,在这种情况下,这通常就足够了;如果不是,则增加线程数或添加任务执行器,以便调度程序将工作交给另一个线程。
没错。因为所有 Polling Ednpoints
都基于预定义的全局 ThreadPoolTaskScheduler
,默认情况下 10
作为池大小:https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler
另一方面,在您的 CPU 最多 16 个内核的同时尝试拥有 100 个线程真的毫无意义。创建更多线程甚至可能会导致应用程序变慢。