具有独特任务的线程池队列
Thread pool queue with unique tasks
我正在使用 ThreadPoolTaskExecutor(共 spring)来异步执行一些任务。
所需任务将从外部数据库加载一些对象到我的系统内存中。
我使用的最大线程池大小为 10,最大队列大小为 100。
假设所有 10 个线程都被占用从我的数据库中获取对象并创建一个任务,它将进入队列。现在创建了另一个任务,它应该从数据库中获取相同的对象(数据库中的相同键),它也会进入队列(假设所有 10 个线程仍然被占用)。
所以我的队列可能很容易被重复的任务填满,这些任务将依次执行,我不希望这种情况发生。
我认为解决方案应该以充当线程池队列的唯一集合的形式出现。
在后台,ThreadPoolTaskExecutor 使用不提供唯一性的 LinkedBlockingQueue。
我想到了一些可能的解决方案,但 none 让我满意:
- 使用 ThreadPoolExecutor 而不是 ThreadPoolTaskExecutor。 ThreadPoolExecutor 提供了一个构造函数,可以让我确定线程池队列类型,但它需要实现 BlockingQueue 接口。我找不到保留唯一性的实现。
这让我尝试扩展 LinkedBlockingQueue 并覆盖添加:
public boolean add(E e)
if(!this.contains(e)) {
return super.add(e);
} else {
return false;
}
}
但据我所知,这将导致性能大幅下降,因为 contains
方法受到 O(n) 的限制 - 坏主意。
什么可以解决我的问题?我的目标是获得良好的性能(在内存性能折衷的情况下,我不介意为了性能而放弃内存)。
使用 Guava and ListenableFuture 你可以做类似的事情(尚未测试)
Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
String t1 = "abc";
if(uniqueQueue.add(t1)) {
ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
uniqueQueue.remove(t1);
}
@Override
public void onFailure(Throwable t) {
uniqueQueue.remove(t1);
}
});
}
导致
- 只有当前未处理或未在队列中的项目才会添加到队列中 (
uniqueQueue
)
- 已处理的项目将从
uniqueQueue
中删除
- 队列中最多只能有 100 个项目
此实现不处理
Exceptions
由 submit()
方法抛出
unqiueQueue
中的最大项目数
关于将对象从数据库加载到内存中的要求,您可能需要查看 Guava's Caches。
更新:
如果允许您管理数据库,我建议使用数据库本身来防止重复工作:
- 将 lockid 列添加到您的 table
- 向您的 table 添加一个状态栏(可能是 'new' 和 'done')
- 确保您的数据库隔离级别至少为 READ_COMMITTED
然后在你的主线程中尝试这样的事情:
Random rand = new Random();
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1;
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ##
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId;
// now execute those sql statements with QueryRunner or whatever you use in-house
return 来自 select 的行是您添加到队列中的行。
然后,您有一个 class 实现 Runnable,它通过从队列中检索这些行来处理这些行。一旦它处理了一行,您将执行另一个 SQL 更新(在 Runnable 内部)以将 lockId 设置回零并将状态设置为 'done'.
即使您有多台机器,每台机器都有自己的队列,这也能正常工作。
类似于公认解决方案但基于Spring(与番石榴相反)的解决方案:
创建接口RunnableWithId:
public interface RunnableWithId extends Runnable {
/**
* @return A unique id for this task
*/
String getTaskId();
}
创建另一个接口TaskWithIdExecutor:
import org.springframework.core.task.TaskExecutor;
public interface TaskWithIdExecutor extends TaskExecutor {
/**
* Executes the given task if it is not queued or already running
*
* @param task The task to execute
*/
void executeIfNotQueuedOrRunningAlready(RunnableWithId task);
}
创建自定义执行器UniquTaskExecutor:
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Set;
/**
* In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
* to execute a task only if it is not already running/queued using the
* executeIfNotQueuedOrRunningAlready method.
*
* @see ThreadPoolTaskExecutor
*/
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {
private Set<String> queuedTasks;
public UniquTaskExecutor() {
queuedTasks = Sets.newConcurrentHashSet();
}
@Override
public void execute(Runnable task) {
super.execute(task);
}
/**
* @param task The task to execute
*/
@Override
public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
if (queuedTasks.add(task.getTaskId())) {
ListenableFuture<?> res = submitListenable(task);
res.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
queuedTasks.remove(task.getTaskId());
}
@Override
public void onSuccess(Object o) {
queuedTasks.remove(task.getTaskId());
}
});
}
}
}
使用 UniquTaskExecutor 的 executeIfNotQueuedOrRunningAlready 方法来实现任务执行的唯一性。
我正在使用 ThreadPoolTaskExecutor(共 spring)来异步执行一些任务。
所需任务将从外部数据库加载一些对象到我的系统内存中。 我使用的最大线程池大小为 10,最大队列大小为 100。
假设所有 10 个线程都被占用从我的数据库中获取对象并创建一个任务,它将进入队列。现在创建了另一个任务,它应该从数据库中获取相同的对象(数据库中的相同键),它也会进入队列(假设所有 10 个线程仍然被占用)。
所以我的队列可能很容易被重复的任务填满,这些任务将依次执行,我不希望这种情况发生。
我认为解决方案应该以充当线程池队列的唯一集合的形式出现。 在后台,ThreadPoolTaskExecutor 使用不提供唯一性的 LinkedBlockingQueue。
我想到了一些可能的解决方案,但 none 让我满意:
- 使用 ThreadPoolExecutor 而不是 ThreadPoolTaskExecutor。 ThreadPoolExecutor 提供了一个构造函数,可以让我确定线程池队列类型,但它需要实现 BlockingQueue 接口。我找不到保留唯一性的实现。
这让我尝试扩展 LinkedBlockingQueue 并覆盖添加:
public boolean add(E e)
if(!this.contains(e)) {
return super.add(e);
} else {
return false;
}
}
但据我所知,这将导致性能大幅下降,因为 contains
方法受到 O(n) 的限制 - 坏主意。
什么可以解决我的问题?我的目标是获得良好的性能(在内存性能折衷的情况下,我不介意为了性能而放弃内存)。
使用 Guava and ListenableFuture 你可以做类似的事情(尚未测试)
Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
String t1 = "abc";
if(uniqueQueue.add(t1)) {
ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
uniqueQueue.remove(t1);
}
@Override
public void onFailure(Throwable t) {
uniqueQueue.remove(t1);
}
});
}
导致
- 只有当前未处理或未在队列中的项目才会添加到队列中 (
uniqueQueue
) - 已处理的项目将从
uniqueQueue
中删除
- 队列中最多只能有 100 个项目
此实现不处理
Exceptions
由submit()
方法抛出unqiueQueue
中的最大项目数
关于将对象从数据库加载到内存中的要求,您可能需要查看 Guava's Caches。
更新:
如果允许您管理数据库,我建议使用数据库本身来防止重复工作:
- 将 lockid 列添加到您的 table
- 向您的 table 添加一个状态栏(可能是 'new' 和 'done')
- 确保您的数据库隔离级别至少为 READ_COMMITTED
然后在你的主线程中尝试这样的事情:
Random rand = new Random();
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1;
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ##
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId;
// now execute those sql statements with QueryRunner or whatever you use in-house
return 来自 select 的行是您添加到队列中的行。
然后,您有一个 class 实现 Runnable,它通过从队列中检索这些行来处理这些行。一旦它处理了一行,您将执行另一个 SQL 更新(在 Runnable 内部)以将 lockId 设置回零并将状态设置为 'done'.
即使您有多台机器,每台机器都有自己的队列,这也能正常工作。
类似于公认解决方案但基于Spring(与番石榴相反)的解决方案:
创建接口RunnableWithId:
public interface RunnableWithId extends Runnable {
/**
* @return A unique id for this task
*/
String getTaskId();
}
创建另一个接口TaskWithIdExecutor:
import org.springframework.core.task.TaskExecutor;
public interface TaskWithIdExecutor extends TaskExecutor {
/**
* Executes the given task if it is not queued or already running
*
* @param task The task to execute
*/
void executeIfNotQueuedOrRunningAlready(RunnableWithId task);
}
创建自定义执行器UniquTaskExecutor:
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Set;
/**
* In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
* to execute a task only if it is not already running/queued using the
* executeIfNotQueuedOrRunningAlready method.
*
* @see ThreadPoolTaskExecutor
*/
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {
private Set<String> queuedTasks;
public UniquTaskExecutor() {
queuedTasks = Sets.newConcurrentHashSet();
}
@Override
public void execute(Runnable task) {
super.execute(task);
}
/**
* @param task The task to execute
*/
@Override
public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
if (queuedTasks.add(task.getTaskId())) {
ListenableFuture<?> res = submitListenable(task);
res.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
queuedTasks.remove(task.getTaskId());
}
@Override
public void onSuccess(Object o) {
queuedTasks.remove(task.getTaskId());
}
});
}
}
}
使用 UniquTaskExecutor 的 executeIfNotQueuedOrRunningAlready 方法来实现任务执行的唯一性。