优先调度周期性线程
scheduling periodical threads with priority
我必须在 java 8 中实现一种多线程应用程序。
我的应用程序应该定期 运行 一些线程,假设每 30 分钟一次。每个线程都应该调用一个api,从中获取数据并保存到数据库中。数据库是这样组织的:
Table A
Table B
Table C
Table D
Table E
Table F
Table G
Table H
Tables F
、G
和 H
具有指向 Tables A
、B
、C
、D
和E
.
这导致每个 table 有一个 Thread
更新:
Thread A -> Table A
Thread B -> Table B
Thread C -> Table C
Thread D -> Table D
Thread E -> Table E
Thread F -> Table F
Thread G -> Table G
Thread H -> Table H
由于某些 table 具有其他 table 的外键,线程不应同时启动所有线程,而应首先启动 Thread A
、B
、C
和 E
,一旦完成,就应该开始 Thread F
、G
和 H
。
我在互联网上搜索了很多可能的解决方案,但没有找到适合我情况的解决方案。
目前,我已经使用 ScheduledThreadPoolExecutor
实现了我的代码,该代码具有 scheduleWithFixedDelay
函数,确保线程每 30 分钟执行一次。即使我安排每个线程具有不同的优先级,它们都会同时启动,并且在 table 中使用外键执行 insert/update 时出现异常。
这是我的代码的快照,可能有助于理解。
Thread-like
类 看起来像这样:
public class AllarmReasonService implements Runnable {
@Override
public void run() {
AllarmReasonDAO dao = new AllarmReasonDAO();
PagedResultContainer<AllarmReason> allarmReasonContainer = AllarmReasonApi.getInstance().getAllarmReasons();
for(Iterator<AllarmReason> iterator = allarmReasonContainer.getData().iterator(); iterator.hasNext();){
AllarmReason allarmReason = iterator.next();
dao.insertOrUpdateAllarmReason(allarmReason);
}
}
}
threadExecutor
:
public void threadExecutor(){
initializeThreadMap();
Stream<Entry<String, Integer>> sorted = threadTimerMap.entrySet().stream().sorted(Entry.comparingByValue());
scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadTimerMap.size());
for(Iterator<Entry<String, Integer>> entryIterator = sorted.iterator(); entryIterator.hasNext();){
Entry<String, Integer> entry = entryIterator.next();
Runnable service = threadMap.get(entry.getKey());
Thread thread = new Thread(service);
thread.setPriority(entry.getValue());
scheduler.scheduleWithFixedDelay(thread, 0, 30 * 60 * 1000, TimeUnit.MILLISECONDS);
}
}
private void initializeThreadMap(){
threadMap.clear();
threadMap.put("causaliAllarme", new AllarmReasonService());
threadMap.put("causaliLavorazione", new ManufactureReasonService());
threadMap.put("celle", new CellService());
threadMap.put("dipendenti", new EmployeeService());
threadMap.put("ordiniLavorazione", new ManufactureOrderService());
threadMap.put("parzialiLavorazione", new ManufacturePartialService());
threadMap.put("qualita", new QualityService());
threadMap.put("qualitaRilevata", new QualityDetectionService());
}
最后,threadTimerMap
有 key = name of the module/thread (i.e. causaliAllarme)
和 value = priority
,其中模块优先更新的优先级更高。
使用此设置,即使我设置了特定的优先级,我也无法 运行 首先处理最重要的线程。
考虑到我不是线程和多线程的专家。
如有任何帮助,我们将不胜感激。
提前致谢。
对于这种情况,我建议使用 CompletableFuture
功能。
// Declare your pool. Declare ThreadFactory also, it will be very
// helpful with debug, I promise :)
int corePoolSize = 10;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize,
new ThreadFactoryBuilder().setNameFormat("Your-thread-%d").setDaemon(true).build());
List<CompletableFuture<Void>> dependencies = new ArrayList<>();
// Submit the threads for the first stage
dependencies.add(CompletableFuture.runAsync(new AllarmReasonService(), pool));
dependencies.add(CompletableFuture.runAsync(new ManufactureReasonService(), pool));
// ...
// do the same with all your stage-1 threads
// wait while stage 1 completed
try {
for (CompletableFuture<Void> f : dependencies) {
f.get();
}
} catch (InterruptedException | ExecutionException e) {
// log or re-throw
pool.shutdownNow();
}
// stage 2
CompletableFuture.runAsync(new AllarmReasonService(), pool);
CompletableFuture.runAsync(new ManufactureReasonService(), pool);
// other required ...
您也可以使用 CompletableFuture.allOf(CompletableFuture<?>...)
方法聚合期货并等待单个期货。
希望对您有所帮助!
您可以使用 CountDownLatch
.
将这两个函数连接在一起
这段代码只是说明了这个想法 - 可能有更简洁的方法来实现这一点。
class SignalledRunnable implements Runnable {
final CountDownLatch wait;
final CountDownLatch signal;
final Runnable it;
public SignalledRunnable(CountDownLatch waitFor, CountDownLatch signal, Runnable it) {
this.wait = waitFor;
this.signal = signal;
this.it = it;
}
@Override
public void run() {
if (wait != null) {
try {
// Wait for pre-condition.
wait.await();
} catch (InterruptedException e) {
// Please remember to do something here.
}
}
// Do it.
it.run();
if (signal != null) {
// Signal post-condition.
signal.countDown();
}
}
class A implements Runnable {
@Override
public void run() {
// Update table A
}
}
class F implements Runnable {
@Override
public void run() {
// Update table F.
}
}
public void test() {
// Make F wait for A.
A a = new A();
F f = new F();
CountDownLatch afSequencer = new CountDownLatch(1);
// The A must signal at completion.
Runnable ar = new SignalledRunnable(null, afSequencer, a);
// The F should wait for that signal.
Runnable fr = new SignalledRunnable(afSequencer, null, f);
// You can now add ar and fr to your task queue and f will block until a completes.
}
注意:正如@RealSkeptic 指出的那样,请记住 f
线程将 阻塞 直到 a
完成,因此请确保您的线程池足够大处理系统中可能发生的尽可能多的阻塞线程。
我必须在 java 8 中实现一种多线程应用程序。
我的应用程序应该定期 运行 一些线程,假设每 30 分钟一次。每个线程都应该调用一个api,从中获取数据并保存到数据库中。数据库是这样组织的:
Table A
Table B
Table C
Table D
Table E
Table F
Table G
Table H
Tables F
、G
和 H
具有指向 Tables A
、B
、C
、D
和E
.
这导致每个 table 有一个 Thread
更新:
Thread A -> Table A
Thread B -> Table B
Thread C -> Table C
Thread D -> Table D
Thread E -> Table E
Thread F -> Table F
Thread G -> Table G
Thread H -> Table H
由于某些 table 具有其他 table 的外键,线程不应同时启动所有线程,而应首先启动 Thread A
、B
、C
和 E
,一旦完成,就应该开始 Thread F
、G
和 H
。
我在互联网上搜索了很多可能的解决方案,但没有找到适合我情况的解决方案。
目前,我已经使用 ScheduledThreadPoolExecutor
实现了我的代码,该代码具有 scheduleWithFixedDelay
函数,确保线程每 30 分钟执行一次。即使我安排每个线程具有不同的优先级,它们都会同时启动,并且在 table 中使用外键执行 insert/update 时出现异常。
这是我的代码的快照,可能有助于理解。
Thread-like
类 看起来像这样:
public class AllarmReasonService implements Runnable {
@Override
public void run() {
AllarmReasonDAO dao = new AllarmReasonDAO();
PagedResultContainer<AllarmReason> allarmReasonContainer = AllarmReasonApi.getInstance().getAllarmReasons();
for(Iterator<AllarmReason> iterator = allarmReasonContainer.getData().iterator(); iterator.hasNext();){
AllarmReason allarmReason = iterator.next();
dao.insertOrUpdateAllarmReason(allarmReason);
}
}
}
threadExecutor
:
public void threadExecutor(){
initializeThreadMap();
Stream<Entry<String, Integer>> sorted = threadTimerMap.entrySet().stream().sorted(Entry.comparingByValue());
scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadTimerMap.size());
for(Iterator<Entry<String, Integer>> entryIterator = sorted.iterator(); entryIterator.hasNext();){
Entry<String, Integer> entry = entryIterator.next();
Runnable service = threadMap.get(entry.getKey());
Thread thread = new Thread(service);
thread.setPriority(entry.getValue());
scheduler.scheduleWithFixedDelay(thread, 0, 30 * 60 * 1000, TimeUnit.MILLISECONDS);
}
}
private void initializeThreadMap(){
threadMap.clear();
threadMap.put("causaliAllarme", new AllarmReasonService());
threadMap.put("causaliLavorazione", new ManufactureReasonService());
threadMap.put("celle", new CellService());
threadMap.put("dipendenti", new EmployeeService());
threadMap.put("ordiniLavorazione", new ManufactureOrderService());
threadMap.put("parzialiLavorazione", new ManufacturePartialService());
threadMap.put("qualita", new QualityService());
threadMap.put("qualitaRilevata", new QualityDetectionService());
}
最后,threadTimerMap
有 key = name of the module/thread (i.e. causaliAllarme)
和 value = priority
,其中模块优先更新的优先级更高。
使用此设置,即使我设置了特定的优先级,我也无法 运行 首先处理最重要的线程。
考虑到我不是线程和多线程的专家。
如有任何帮助,我们将不胜感激。 提前致谢。
对于这种情况,我建议使用 CompletableFuture
功能。
// Declare your pool. Declare ThreadFactory also, it will be very
// helpful with debug, I promise :)
int corePoolSize = 10;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize,
new ThreadFactoryBuilder().setNameFormat("Your-thread-%d").setDaemon(true).build());
List<CompletableFuture<Void>> dependencies = new ArrayList<>();
// Submit the threads for the first stage
dependencies.add(CompletableFuture.runAsync(new AllarmReasonService(), pool));
dependencies.add(CompletableFuture.runAsync(new ManufactureReasonService(), pool));
// ...
// do the same with all your stage-1 threads
// wait while stage 1 completed
try {
for (CompletableFuture<Void> f : dependencies) {
f.get();
}
} catch (InterruptedException | ExecutionException e) {
// log or re-throw
pool.shutdownNow();
}
// stage 2
CompletableFuture.runAsync(new AllarmReasonService(), pool);
CompletableFuture.runAsync(new ManufactureReasonService(), pool);
// other required ...
您也可以使用 CompletableFuture.allOf(CompletableFuture<?>...)
方法聚合期货并等待单个期货。
希望对您有所帮助!
您可以使用 CountDownLatch
.
这段代码只是说明了这个想法 - 可能有更简洁的方法来实现这一点。
class SignalledRunnable implements Runnable {
final CountDownLatch wait;
final CountDownLatch signal;
final Runnable it;
public SignalledRunnable(CountDownLatch waitFor, CountDownLatch signal, Runnable it) {
this.wait = waitFor;
this.signal = signal;
this.it = it;
}
@Override
public void run() {
if (wait != null) {
try {
// Wait for pre-condition.
wait.await();
} catch (InterruptedException e) {
// Please remember to do something here.
}
}
// Do it.
it.run();
if (signal != null) {
// Signal post-condition.
signal.countDown();
}
}
class A implements Runnable {
@Override
public void run() {
// Update table A
}
}
class F implements Runnable {
@Override
public void run() {
// Update table F.
}
}
public void test() {
// Make F wait for A.
A a = new A();
F f = new F();
CountDownLatch afSequencer = new CountDownLatch(1);
// The A must signal at completion.
Runnable ar = new SignalledRunnable(null, afSequencer, a);
// The F should wait for that signal.
Runnable fr = new SignalledRunnable(afSequencer, null, f);
// You can now add ar and fr to your task queue and f will block until a completes.
}
注意:正如@RealSkeptic 指出的那样,请记住 f
线程将 阻塞 直到 a
完成,因此请确保您的线程池足够大处理系统中可能发生的尽可能多的阻塞线程。