将列表值顺序传递给单值消费者的最佳方式?
Best way to sequentially pass list values to single value consumer?
我正在玩弄 Java8 的流和 CompletableFuture
。我预先存在的代码有一个 class 需要一个 URL 并下载它:
public class FileDownloader implements Runnable {
private URL target;
public FileDownloader(String target) {
this.target = new URL(target);
}
public void run() { /* do it */ }
}
现在,这个 class 从发出 List<String>
(单个主机上的多个目标)的另一个部分获取信息。
我把周围的代码改成了CompletableFuture
:
public class Downloader {
public static void main(String[] args) {
List<String> hosts = fetchTargetHosts();
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync((files) -> {
for (String target : files) {
new FileDownloader(target).run();
}
});
}
}
public static class HostDownloader implements Supplier<List<String>> {
/* not shown */
}
/* My implementation should either be Runnable or Consumer.
Please suggest based on a idiomatic approach to the main loop.
*/
public static class FileDownloader implements Runnable, Consumer<String> {
private String target;
public FileDownloader(String target) {
this.target = target;
}
@Override
public void run() { accept(this.target); }
@Override
public void accept(String target) {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
}
}
}
现在,这感觉不自然。我正在生成一个 String
流,而我的 FileDownloader
一次消耗其中一个。是否有现成的方法可以使我的单个值 Consumer
与 List
一起使用,或者我是否坚持使用 for
循环?
我知道将循环移动到 accept
中并制作一个 Consumer<List<String>>
是微不足道的,这不是重点。
我认为你需要像那样做 forEach:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.stream()
.forEach(target -> new FileDownloader(target).run())
);
}
顺便说一下,您可以对主循环执行相同的操作...
编辑:
由于 OP 编辑了原始 post,添加了 FileDownloader 的实现细节,因此我正在相应地编辑我的答案。 Java 8 功能接口旨在允许使用 lambda expr 代替具体 Class。它并不意味着像常规接口 9 那样实现,尽管它可以)因此,"to take advantage of" Java 8 消费者意味着用这样的接受代码替换 FileDownloader:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.forEach(target -> {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
})
);
}
另一种选择是:
CompletableFuture.supplyAsync(worker)
.thenApply(list -> list.stream().map(FileDownloader::new))
.thenAccept(s -> s.forEach(FileDownloader::run));
将两个直接相关的步骤分解为两个异步步骤没有意义。他们仍然相依为命,如果分离有任何影响,那也不会是积极的。
您可以简单地使用
List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().forEach(fileDownloader));
或者,假设 FileDownloader
没有关于下载的可变状态:
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().parallelStream().forEach(fileDownloader));
这仍然具有与使用 supplyAsync
加 thenAcceptAsync
的原始方法相同的并发级别,只是因为这两个相关步骤无论如何不能同时 运行,所以简单解决方案是将这两个步骤放在一个将异步执行的简洁操作中。
但是,此时值得注意的是,不建议将 CompletableFuture
全部用于此操作。正如 it’s documentation 所述:
- All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
公共池的问题在于其预配置的并发级别取决于 CPU 核心的数量,如果线程在 I/O 操作期间被阻塞,则不会进行调整。换句话说,它不适合I/O操作。
与 Stream
不同,CompletableFuture
允许您为 async 操作指定 Executor
,因此您可以配置自己的 Executor
适合 I/O 操作,另一方面,当你处理一个 Executor
时,根本不需要 CompletableFuture
,至少不需要这样的简单任务:
List<String> hosts = fetchTargetHosts();
int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
for(String target: new HostDownloader(host).get())
connEs.execute(()->fileDownloader.accept(target));
});
在这个地方,您可以考虑将 FileDownloader.accept
的代码内联到 lambda 表达式中,或者将其还原为 Runnable
,以便您可以将内部循环的语句更改为 connEs.execute(new FileDownloader(target))
.
我正在玩弄 Java8 的流和 CompletableFuture
。我预先存在的代码有一个 class 需要一个 URL 并下载它:
public class FileDownloader implements Runnable {
private URL target;
public FileDownloader(String target) {
this.target = new URL(target);
}
public void run() { /* do it */ }
}
现在,这个 class 从发出 List<String>
(单个主机上的多个目标)的另一个部分获取信息。
我把周围的代码改成了CompletableFuture
:
public class Downloader {
public static void main(String[] args) {
List<String> hosts = fetchTargetHosts();
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync((files) -> {
for (String target : files) {
new FileDownloader(target).run();
}
});
}
}
public static class HostDownloader implements Supplier<List<String>> {
/* not shown */
}
/* My implementation should either be Runnable or Consumer.
Please suggest based on a idiomatic approach to the main loop.
*/
public static class FileDownloader implements Runnable, Consumer<String> {
private String target;
public FileDownloader(String target) {
this.target = target;
}
@Override
public void run() { accept(this.target); }
@Override
public void accept(String target) {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
}
}
}
现在,这感觉不自然。我正在生成一个 String
流,而我的 FileDownloader
一次消耗其中一个。是否有现成的方法可以使我的单个值 Consumer
与 List
一起使用,或者我是否坚持使用 for
循环?
我知道将循环移动到 accept
中并制作一个 Consumer<List<String>>
是微不足道的,这不是重点。
我认为你需要像那样做 forEach:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.stream()
.forEach(target -> new FileDownloader(target).run())
);
}
顺便说一下,您可以对主循环执行相同的操作...
编辑: 由于 OP 编辑了原始 post,添加了 FileDownloader 的实现细节,因此我正在相应地编辑我的答案。 Java 8 功能接口旨在允许使用 lambda expr 代替具体 Class。它并不意味着像常规接口 9 那样实现,尽管它可以)因此,"to take advantage of" Java 8 消费者意味着用这样的接受代码替换 FileDownloader:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.forEach(target -> {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
})
);
}
另一种选择是:
CompletableFuture.supplyAsync(worker)
.thenApply(list -> list.stream().map(FileDownloader::new))
.thenAccept(s -> s.forEach(FileDownloader::run));
将两个直接相关的步骤分解为两个异步步骤没有意义。他们仍然相依为命,如果分离有任何影响,那也不会是积极的。
您可以简单地使用
List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().forEach(fileDownloader));
或者,假设 FileDownloader
没有关于下载的可变状态:
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().parallelStream().forEach(fileDownloader));
这仍然具有与使用 supplyAsync
加 thenAcceptAsync
的原始方法相同的并发级别,只是因为这两个相关步骤无论如何不能同时 运行,所以简单解决方案是将这两个步骤放在一个将异步执行的简洁操作中。
但是,此时值得注意的是,不建议将 CompletableFuture
全部用于此操作。正如 it’s documentation 所述:
- All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
公共池的问题在于其预配置的并发级别取决于 CPU 核心的数量,如果线程在 I/O 操作期间被阻塞,则不会进行调整。换句话说,它不适合I/O操作。
与 Stream
不同,CompletableFuture
允许您为 async 操作指定 Executor
,因此您可以配置自己的 Executor
适合 I/O 操作,另一方面,当你处理一个 Executor
时,根本不需要 CompletableFuture
,至少不需要这样的简单任务:
List<String> hosts = fetchTargetHosts();
int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
for(String target: new HostDownloader(host).get())
connEs.execute(()->fileDownloader.accept(target));
});
在这个地方,您可以考虑将 FileDownloader.accept
的代码内联到 lambda 表达式中,或者将其还原为 Runnable
,以便您可以将内部循环的语句更改为 connEs.execute(new FileDownloader(target))
.