Java 多线程内部 class 调用外部 class
Java multithreading inner class calling outer class
我有一个实现可运行的内部 class,它发出 HTTP 请求,然后调用外部 class 的函数来处理响应。预期行为是将所有响应附加到对象列表。
示例代码:
public class Status {
private ExecutorService executor;
CloseableHttpClient httpClient;
List<String> results = new LinkedList<>();
public Status() {
executor = Executors.newFixedThreadPool(5);
httpClient = HttpClients.createDefault();
}
public handleInput(Entity entity) {
String result = EntityUtils.toString(httpEntity);
results.add(result);
}
private class Action implements Runnable {
@Override
public void run() {
try {
//do some http calls
// response = httpClient.execute(httpPost);
handleInput(response.getEntity())
} catch (SomeException e) {
lastResult = false;
}
}
}}
在我的测试中,我没有遇到任何问题,但我想知道将多个线程的结果添加到同一个链表是否是线程安全的操作,如果不是,这种情况下的最佳设计是什么.
不是线程安全的
不,跨线程操作非线程安全 List
不是 thread-safe。
Synchronized
您可以将 handleInput
方法 synchronized
设为 commented by Bodewes。当然,该方法成为潜在的瓶颈,因为一次只能有一个线程调用同步方法。也许您的情况不是问题,但请注意。
线程安全集合
另一种选择是用线程安全的集合替换您的 LinkedList
。例如,CopyOnWriteArrayList
, CopyOnWriteArraySet
, or ConcurrentSkipListSet
.
Callable
& Future
但我建议您不要将 Entity
对象的多线程生成与收集和处理这些对象混为一谈。分配给后台线程的每个任务都应尽可能“管好自己的事”。让任务共享一个列表会使它们不必要地跨线程纠缠在一起;应该尽可能避免这种纠缠(共享资源)。
将您的任务从 Runnable
更改为 Callable
,以便 return 成为一个值。 return 值将是每个 Entity
产生的。
当您将每个 Callable
提交给执行程序服务时,您会返回一个 Future
对象。收集那些对象。通过这些对象中的每一个,您都可以访问每个任务的工作结果。
等待执行器服务完成所有提交的任务。然后检查每个Future
.
的结果
通过使用 Future
对象来收集后台线程产生的结果,并且只在 完成后才处理这些结果,我们已经消除了需要使您的 results
集合线程安全。原始线程将这些结果收集到列表中,而不是每个线程都添加到列表中。
请注意,在下面的示例代码中,我们没有在后台线程中的任务 运行 之间共享任何资源。每个任务都做自己的事情,通过其特定的 Future
对象报告自己的结果。任务不再访问共享 List
.
顺便说一下,请注意此示例代码没有像问题中的代码那样将执行程序服务保留在成员字段中。在我们这里的情况下,执行程序服务应该 (1) 实例化,(2) 使用,以及 (3) 在一次操作中全部关闭。您 必须 在不再需要时(或当您的应用程序退出时)关闭您的执行程序服务。否则它的后台线程池可能运行无限期,像个僵尸♂️。
示例代码
package work.basil.threading;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.*;
public class Status
{
record Entity( UUID uuid , Instant instant ) { }
public List < String > process ()
{
ExecutorService executorService = Executors.newFixedThreadPool( 5 );
List < WebCallTask > tasks = List.of();
try
{
tasks = List.of(
new WebCallTask( new URI( "http://www.Google.com/" ) ) ,
new WebCallTask( new URI( "http://www.DuckDuckGo.com/" ) ) ,
new WebCallTask( new URI( "http://www.Adoptium.net/" ) )
);
} catch ( URISyntaxException e )
{
e.printStackTrace();
}
List < Future < Entity > > futures = List.of();
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }
executorService.shutdown();
try { executorService.awaitTermination( 2 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
List < String > results = new ArrayList <>( tasks.size() );
for ( Future < Entity > future : futures )
{
try
{
Entity entity = future.get();
String result = this.handleInput( entity );
results.add( result );
} catch ( InterruptedException e )
{
e.printStackTrace();
} catch ( ExecutionException e )
{
e.printStackTrace();
}
}
return results;
}
public String handleInput ( Entity entity )
{
if ( Objects.isNull( entity ) ) return "Not Available.";
return entity.toString();
}
private class WebCallTask implements Callable < Entity >
{
private URI uri;
public WebCallTask ( URI uri )
{
this.uri = uri;
}
@Override
public Entity call ()
{
Entity entity = null;
try
{
// Perform some http calls.
// response = httpClient.execute(httpPost);
// Pretend to wait on network call by sleeping.
System.out.println( "Thread: " + Thread.currentThread().getId() + " is sleeping, to pretend doing network call. " + Instant.now() );
try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 3 , 11 ) ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
entity = new Entity( UUID.randomUUID() , Instant.now() );
System.out.println( "Thread: " + Thread.currentThread().getId() + " produced an `Entity` object. Task done. " + Instant.now() );
} catch ( Exception e ) // In your real code, you would be catching networking errors related to your networkcall.
{
e.printStackTrace();
} finally
{
return entity; // May return `null` as a legitimate value. In real work I would use `Optional< Entity >` here to signal that `null` is a possible and legitimate value. But let's not overcomplicate this example code.
}
}
}
public static void main ( String[] args )
{
System.out.println( "Thread: " + Thread.currentThread().getId() + " is starting demo. " + Instant.now() );
Status statusApp = new Status();
List < String > output = statusApp.process();
System.out.println( "output = " + output );
System.out.println( "Thread: " + Thread.currentThread().getId() + " is ending demo. " + Instant.now() );
}
}
当运行.
Thread: 1 is starting demo. 2021-10-09T03:58:41.269177Z
Thread: 15 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286424Z
Thread: 16 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286828Z
Thread: 17 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.288108Z
Thread: 16 produced an `Entity` object. Task done. 2021-10-09T03:58:44.323703Z
Thread: 15 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294364Z
Thread: 17 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294269Z
output = [Entity[uuid=04d73a52-79ec-4a61-becb-ce056d3aa9fa, instant=2021-10-09T03:58:46.294359Z], Entity[uuid=cc5a7266-4101-41bb-b806-8b29b77a82d0, instant=2021-10-09T03:58:44.323688Z], Entity[uuid=3cc24ad9-3ea1-4a24-98d0-c3df4bf161b6, instant=2021-10-09T03:58:46.294254Z]]
Thread: 1 is ending demo. 2021-10-09T03:58:46.321313Z
我有一个实现可运行的内部 class,它发出 HTTP 请求,然后调用外部 class 的函数来处理响应。预期行为是将所有响应附加到对象列表。
示例代码:
public class Status {
private ExecutorService executor;
CloseableHttpClient httpClient;
List<String> results = new LinkedList<>();
public Status() {
executor = Executors.newFixedThreadPool(5);
httpClient = HttpClients.createDefault();
}
public handleInput(Entity entity) {
String result = EntityUtils.toString(httpEntity);
results.add(result);
}
private class Action implements Runnable {
@Override
public void run() {
try {
//do some http calls
// response = httpClient.execute(httpPost);
handleInput(response.getEntity())
} catch (SomeException e) {
lastResult = false;
}
}
}}
在我的测试中,我没有遇到任何问题,但我想知道将多个线程的结果添加到同一个链表是否是线程安全的操作,如果不是,这种情况下的最佳设计是什么.
不是线程安全的
不,跨线程操作非线程安全 List
不是 thread-safe。
Synchronized
您可以将 handleInput
方法 synchronized
设为 commented by Bodewes。当然,该方法成为潜在的瓶颈,因为一次只能有一个线程调用同步方法。也许您的情况不是问题,但请注意。
线程安全集合
另一种选择是用线程安全的集合替换您的 LinkedList
。例如,CopyOnWriteArrayList
, CopyOnWriteArraySet
, or ConcurrentSkipListSet
.
Callable
& Future
但我建议您不要将 Entity
对象的多线程生成与收集和处理这些对象混为一谈。分配给后台线程的每个任务都应尽可能“管好自己的事”。让任务共享一个列表会使它们不必要地跨线程纠缠在一起;应该尽可能避免这种纠缠(共享资源)。
将您的任务从 Runnable
更改为 Callable
,以便 return 成为一个值。 return 值将是每个 Entity
产生的。
当您将每个 Callable
提交给执行程序服务时,您会返回一个 Future
对象。收集那些对象。通过这些对象中的每一个,您都可以访问每个任务的工作结果。
等待执行器服务完成所有提交的任务。然后检查每个Future
.
通过使用 Future
对象来收集后台线程产生的结果,并且只在 完成后才处理这些结果,我们已经消除了需要使您的 results
集合线程安全。原始线程将这些结果收集到列表中,而不是每个线程都添加到列表中。
请注意,在下面的示例代码中,我们没有在后台线程中的任务 运行 之间共享任何资源。每个任务都做自己的事情,通过其特定的 Future
对象报告自己的结果。任务不再访问共享 List
.
顺便说一下,请注意此示例代码没有像问题中的代码那样将执行程序服务保留在成员字段中。在我们这里的情况下,执行程序服务应该 (1) 实例化,(2) 使用,以及 (3) 在一次操作中全部关闭。您 必须 在不再需要时(或当您的应用程序退出时)关闭您的执行程序服务。否则它的后台线程池可能运行无限期,像个僵尸♂️。
示例代码
package work.basil.threading;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.*;
public class Status
{
record Entity( UUID uuid , Instant instant ) { }
public List < String > process ()
{
ExecutorService executorService = Executors.newFixedThreadPool( 5 );
List < WebCallTask > tasks = List.of();
try
{
tasks = List.of(
new WebCallTask( new URI( "http://www.Google.com/" ) ) ,
new WebCallTask( new URI( "http://www.DuckDuckGo.com/" ) ) ,
new WebCallTask( new URI( "http://www.Adoptium.net/" ) )
);
} catch ( URISyntaxException e )
{
e.printStackTrace();
}
List < Future < Entity > > futures = List.of();
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }
executorService.shutdown();
try { executorService.awaitTermination( 2 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
List < String > results = new ArrayList <>( tasks.size() );
for ( Future < Entity > future : futures )
{
try
{
Entity entity = future.get();
String result = this.handleInput( entity );
results.add( result );
} catch ( InterruptedException e )
{
e.printStackTrace();
} catch ( ExecutionException e )
{
e.printStackTrace();
}
}
return results;
}
public String handleInput ( Entity entity )
{
if ( Objects.isNull( entity ) ) return "Not Available.";
return entity.toString();
}
private class WebCallTask implements Callable < Entity >
{
private URI uri;
public WebCallTask ( URI uri )
{
this.uri = uri;
}
@Override
public Entity call ()
{
Entity entity = null;
try
{
// Perform some http calls.
// response = httpClient.execute(httpPost);
// Pretend to wait on network call by sleeping.
System.out.println( "Thread: " + Thread.currentThread().getId() + " is sleeping, to pretend doing network call. " + Instant.now() );
try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 3 , 11 ) ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
entity = new Entity( UUID.randomUUID() , Instant.now() );
System.out.println( "Thread: " + Thread.currentThread().getId() + " produced an `Entity` object. Task done. " + Instant.now() );
} catch ( Exception e ) // In your real code, you would be catching networking errors related to your networkcall.
{
e.printStackTrace();
} finally
{
return entity; // May return `null` as a legitimate value. In real work I would use `Optional< Entity >` here to signal that `null` is a possible and legitimate value. But let's not overcomplicate this example code.
}
}
}
public static void main ( String[] args )
{
System.out.println( "Thread: " + Thread.currentThread().getId() + " is starting demo. " + Instant.now() );
Status statusApp = new Status();
List < String > output = statusApp.process();
System.out.println( "output = " + output );
System.out.println( "Thread: " + Thread.currentThread().getId() + " is ending demo. " + Instant.now() );
}
}
当运行.
Thread: 1 is starting demo. 2021-10-09T03:58:41.269177Z
Thread: 15 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286424Z
Thread: 16 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286828Z
Thread: 17 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.288108Z
Thread: 16 produced an `Entity` object. Task done. 2021-10-09T03:58:44.323703Z
Thread: 15 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294364Z
Thread: 17 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294269Z
output = [Entity[uuid=04d73a52-79ec-4a61-becb-ce056d3aa9fa, instant=2021-10-09T03:58:46.294359Z], Entity[uuid=cc5a7266-4101-41bb-b806-8b29b77a82d0, instant=2021-10-09T03:58:44.323688Z], Entity[uuid=3cc24ad9-3ea1-4a24-98d0-c3df4bf161b6, instant=2021-10-09T03:58:46.294254Z]]
Thread: 1 is ending demo. 2021-10-09T03:58:46.321313Z