是否有必要在 ThreadFactory 中使用 AtomicInteger?
Is it necessary to use AtomicInteger in a ThreadFactory?
我认为有必要在 ThreadFactory 中使用 AtomicInteger
但是当我试图向自己证明它时,我失败了。
new ThreadFactory() {
private int threadId = 0; <---- AtomicInteger preferred
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++)); <--- dangerous code
return t;
}
}
如果出现多个请求,线程工厂将生成线程来处理它们,并且在生成过程中,可能存在间隙,在此期间会出现竞争条件 可能会潜入。
我尝试使用以下代码来证明我的理论,但 2_000 核心线程根本不会发生这种情况。
@Slf4j
public class ThreadFactoryTest {
private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100000), new ThreadFactory() {
private int threadId = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++));
if (threadIdThreadMap.contains(t.getName())) {
log.error("already existed");
System.err.println(myExecutor);
myExecutor.shutdownNow();
} else threadIdThreadMap.put(t.getName(), t);
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadFactory() throws Exception {
for (int i = 0; i < 100; ++i) {
new Thread(() -> runOneHundredJobs()).start();
}
Thread.sleep(1000000);
myExecutor.shutdown();
myExecutor.awaitTermination(100, TimeUnit.MINUTES);
}
private void runOneHundredJobs() {
log.info("{} starting to submit tasks", Thread.currentThread().getName());
for (int i = 0; i < 100; ++i) {
myExecutor.execute(() -> {
while (100 < System.currentTimeMillis()) {
try {
Thread.sleep(1000);
if (Math.random() > 0.99) break;
System.out.println(Thread.currentThread().getName());
System.out.println(myExecutor);
} catch (Exception e) {
}
}
} );
}
}
}
看起来像一个愚蠢的问题,因为我一直都知道“很难为多线程竞争条件创造间隙”。
任何 help/clue 将不胜感激 ;)
更新
非常感谢@StephenC 和@Slaw 一路上的帮助。很抱歉我误解了那里的一些要点;(
所以 newThread
应该以 线程安全 方式实现,然后在我的例子中, AtomicInteger
是必需的。我想引用 StephenC 的话:
Failure to demonstrate a race condition doesn't mean it doesn't exist.
Is it necessary to use AtomicInteger in a ThreadFactory?
这将取决于工厂对象的使用方式。
如果您为 ThreadPoolExecutor
的每个实例提供不同的工厂对象,那么工厂的(实际)并发要求将取决于执行程序如何使用它。如果 javadoc 中没有语句,您将需要检查源代码。我没有检查过,但我怀疑线程池的扩展(包括对 newThread
的调用)发生在互斥体内部。如果我的怀疑是正确的,那么这个用例不需要工厂对象是线程安全的。
UPDATE - 我现在已经检查过了,我的怀疑是 不正确(对于 Java 8 和 12)。 newThread
调用是在创建新的 Worker
对象时进行的,而在持有互斥锁时不会进行。因此,您的 newThread
方法在此上下文中也需要是线程安全的。
如果工厂对象与其他事物(例如另一个执行程序)共享,那么您是正确的:您的 newThread
方法需要是线程安全的。
我没有查看您的代码来尝试显示竞争条件,但在我看来,这不是解决此问题的最佳方法。代码检查和推理是更好的方法。未能证明竞争条件并不意味着它不存在。
我正在简化测试以使预期的结果从水下浮出水面。
通过下面的测试,预期的线程大小是 1000
,而使用 int
会经常给出 less
大小(在我的 macOS 中是 994、996、999),并非所有时候。
public class ThreadFactoryTest {
private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100000), new ThreadFactory() {
private int threadId = 0;
private AtomicInteger atomicThreadId = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++));
// uncomment this line, the thread size will be less than 1000
t.setName("my-thread-" + (atomicThreadId.getAndIncrement()));
threadIdThreadMap.put(t.getName(), t);
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadFactory() throws Exception {
for (int i = 0; i < 50; ++i) {
new Thread(() -> runOneHundredJobs()).start();
}
Thread.sleep(1000000);
myExecutor.shutdown();
myExecutor.awaitTermination(100, TimeUnit.MINUTES);
}
private void runOneHundredJobs() {
for (int i = 0; i < 20; ++i) {
myExecutor.execute(() -> {
while (100 < System.currentTimeMillis()) {
try {
Thread.sleep(1000);
log.warn("count: {}", threadIdThreadMap.size());
} catch (Exception e) {
}
}
});
}
}
}
我认为有必要在 ThreadFactory 中使用 AtomicInteger
但是当我试图向自己证明它时,我失败了。
new ThreadFactory() {
private int threadId = 0; <---- AtomicInteger preferred
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++)); <--- dangerous code
return t;
}
}
如果出现多个请求,线程工厂将生成线程来处理它们,并且在生成过程中,可能存在间隙,在此期间会出现竞争条件 可能会潜入。
我尝试使用以下代码来证明我的理论,但 2_000 核心线程根本不会发生这种情况。
@Slf4j
public class ThreadFactoryTest {
private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100000), new ThreadFactory() {
private int threadId = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++));
if (threadIdThreadMap.contains(t.getName())) {
log.error("already existed");
System.err.println(myExecutor);
myExecutor.shutdownNow();
} else threadIdThreadMap.put(t.getName(), t);
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadFactory() throws Exception {
for (int i = 0; i < 100; ++i) {
new Thread(() -> runOneHundredJobs()).start();
}
Thread.sleep(1000000);
myExecutor.shutdown();
myExecutor.awaitTermination(100, TimeUnit.MINUTES);
}
private void runOneHundredJobs() {
log.info("{} starting to submit tasks", Thread.currentThread().getName());
for (int i = 0; i < 100; ++i) {
myExecutor.execute(() -> {
while (100 < System.currentTimeMillis()) {
try {
Thread.sleep(1000);
if (Math.random() > 0.99) break;
System.out.println(Thread.currentThread().getName());
System.out.println(myExecutor);
} catch (Exception e) {
}
}
} );
}
}
}
看起来像一个愚蠢的问题,因为我一直都知道“很难为多线程竞争条件创造间隙”。
任何 help/clue 将不胜感激 ;)
更新
非常感谢@StephenC 和@Slaw 一路上的帮助。很抱歉我误解了那里的一些要点;(
所以 newThread
应该以 线程安全 方式实现,然后在我的例子中, AtomicInteger
是必需的。我想引用 StephenC 的话:
Failure to demonstrate a race condition doesn't mean it doesn't exist.
Is it necessary to use AtomicInteger in a ThreadFactory?
这将取决于工厂对象的使用方式。
如果您为
ThreadPoolExecutor
的每个实例提供不同的工厂对象,那么工厂的(实际)并发要求将取决于执行程序如何使用它。如果 javadoc 中没有语句,您将需要检查源代码。我没有检查过,但我怀疑线程池的扩展(包括对newThread
的调用)发生在互斥体内部。如果我的怀疑是正确的,那么这个用例不需要工厂对象是线程安全的。UPDATE - 我现在已经检查过了,我的怀疑是 不正确(对于 Java 8 和 12)。
newThread
调用是在创建新的Worker
对象时进行的,而在持有互斥锁时不会进行。因此,您的newThread
方法在此上下文中也需要是线程安全的。如果工厂对象与其他事物(例如另一个执行程序)共享,那么您是正确的:您的
newThread
方法需要是线程安全的。
我没有查看您的代码来尝试显示竞争条件,但在我看来,这不是解决此问题的最佳方法。代码检查和推理是更好的方法。未能证明竞争条件并不意味着它不存在。
我正在简化测试以使预期的结果从水下浮出水面。
通过下面的测试,预期的线程大小是 1000
,而使用 int
会经常给出 less
大小(在我的 macOS 中是 994、996、999),并非所有时候。
public class ThreadFactoryTest {
private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100000), new ThreadFactory() {
private int threadId = 0;
private AtomicInteger atomicThreadId = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("my-thread-" + (threadId++));
// uncomment this line, the thread size will be less than 1000
t.setName("my-thread-" + (atomicThreadId.getAndIncrement()));
threadIdThreadMap.put(t.getName(), t);
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadFactory() throws Exception {
for (int i = 0; i < 50; ++i) {
new Thread(() -> runOneHundredJobs()).start();
}
Thread.sleep(1000000);
myExecutor.shutdown();
myExecutor.awaitTermination(100, TimeUnit.MINUTES);
}
private void runOneHundredJobs() {
for (int i = 0; i < 20; ++i) {
myExecutor.execute(() -> {
while (100 < System.currentTimeMillis()) {
try {
Thread.sleep(1000);
log.warn("count: {}", threadIdThreadMap.size());
} catch (Exception e) {
}
}
});
}
}
}