使用一个 Java 8 Consumer 作为 Runnable Callback - 线程安全吗?
Using one Java 8 Consumer as Runnable Callback - is threadsafe?
我有一个有 4 个活动线程的 ScheduledThreadPoolExecutor
。它充满了一堆任务,其中每个任务处理一大块项目。
每个任务必须有 3 个回调:开始、结束和每个已处理项目后的回调。
每次回调都会触发我的数据库更新。这是一个有点长的 运行 任务。
这是一段示例代码,应该可以说明我在做什么:
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4);
Consumer<String> processed = (String o) -> {
System.err.println("PROCESSED: " + o);
try { Thread.sleep(10); }
catch (Exception e) { e.printStackTrace(); }
};
for(int i=0; i<10; i++) {
executor.schedule(
new ChunkTask("task"+i, processed),
500,
TimeUnit.MILLISECONDS
);
}
}
public static class ChunkTask implements Runnable {
String taskId;
Consumer<String> processedCallback;
public ChunkTask(String taskId, Consumer<String> processedCallback) {
this.taskId = taskId;
this.processedCallback = processedCallback;
}
@Override
public void run() {
for(int i=0; i<50; i++) {
processedCallback.accept(taskId+" "+i);
}
}
}
我只是省略了开始和结束回调,因为它与处理后的回调基本相同。
如您所见,我创建了一个 Consumer
对象。其中有一个Thread.sleep(10)
来模拟数据库访问。该对象被所有 4 个线程并行调用。
我想知道这是否是线程安全的。 在我看来,Consumer
只是一个无状态的对象,有一个无状态的方法。虽然它可以并行调用任意多次。
我说得对吗?
编辑:我知道我的回调是同步的。这只是为了测试。稍后我想让它异步。
是的,您的消费者没有任何状态,因此它是线程安全的。它使用的唯一共享对象是 System.err
,它本身是线程安全的。
当然,在实际代码中,线程安全将取决于您的操作,而不是打印到 System.err
。如果您使用共享数据库服务并且该服务不是线程安全的,那么您就会遇到问题。
我有一个有 4 个活动线程的 ScheduledThreadPoolExecutor
。它充满了一堆任务,其中每个任务处理一大块项目。
每个任务必须有 3 个回调:开始、结束和每个已处理项目后的回调。
每次回调都会触发我的数据库更新。这是一个有点长的 运行 任务。
这是一段示例代码,应该可以说明我在做什么:
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4);
Consumer<String> processed = (String o) -> {
System.err.println("PROCESSED: " + o);
try { Thread.sleep(10); }
catch (Exception e) { e.printStackTrace(); }
};
for(int i=0; i<10; i++) {
executor.schedule(
new ChunkTask("task"+i, processed),
500,
TimeUnit.MILLISECONDS
);
}
}
public static class ChunkTask implements Runnable {
String taskId;
Consumer<String> processedCallback;
public ChunkTask(String taskId, Consumer<String> processedCallback) {
this.taskId = taskId;
this.processedCallback = processedCallback;
}
@Override
public void run() {
for(int i=0; i<50; i++) {
processedCallback.accept(taskId+" "+i);
}
}
}
我只是省略了开始和结束回调,因为它与处理后的回调基本相同。
如您所见,我创建了一个 Consumer
对象。其中有一个Thread.sleep(10)
来模拟数据库访问。该对象被所有 4 个线程并行调用。
我想知道这是否是线程安全的。 在我看来,Consumer
只是一个无状态的对象,有一个无状态的方法。虽然它可以并行调用任意多次。
我说得对吗?
编辑:我知道我的回调是同步的。这只是为了测试。稍后我想让它异步。
是的,您的消费者没有任何状态,因此它是线程安全的。它使用的唯一共享对象是 System.err
,它本身是线程安全的。
当然,在实际代码中,线程安全将取决于您的操作,而不是打印到 System.err
。如果您使用共享数据库服务并且该服务不是线程安全的,那么您就会遇到问题。