使用 parallelStream() 导致未关闭的线程?
Usage of parallelStream() resulting in unclosed Threads?
我正在写一个 rss 媒体采集器,我的一个 parallelStream()
s 导致线程未关闭。
它们都是 Daemon Thread [ForkJoinPool.commonPool-worker-xx]
个线程。
我能发现的唯一区别是这两个示例都是从不同的线程调用的。这可能是问题所在吗?即使当我尝试启动新的 Thread().start 时,parallelStream()
仍未关闭所有 ForkJoinPool
线程。而且我也尝试使用简单的 objects 像 ArrayList<int>
一样的结果。这是主线程外的通缉行为吗?
或者只是文档中描述的行为(粗体部分):
A ForkJoinPool differs from other kinds of ExecutorService mainly by
virtue of employing work-stealing: all threads in the pool attempt to
find and execute tasks submitted to the pool and/or created by other
active tasks (eventually blocking waiting for work if none exist).
public class MainRoutine {
public static void startRoutine(SubReddit subReddit) {
ArrayList<Entry> rssEntry = RSSgrab.pullRss(new SubReddit("pics", true, null));
rssEntry.parallelStream().forEach(System.out::println); //produces unclosed threads
System.out.println(Thread.currentThread().getName()); //prints AWT-EventQueue-0
}
public static void main(String[] args) {
ArrayList<Entry> rssEntry = RSSgrab.pullRss(new SubReddit("pics", true, null));
rssEntry.parallelStream().forEach(System.out::println); //this does not produce unclosed threads
System.out.println(Thread.currentThread().getName()); //prints main
//my bad, does also produce unclosed threads but the runtime is so short that I did not notice ofc
}
}
条目class
public class Entry {
String user;
String userUri;
String id;
String uri;
String date;
String title;
private ArrayList<String> media = new ArrayList<String>();
public Entry(String user, String userUri, String id, String uri, String date, String title) {
this.user = user;
this.userUri = userUri;
this.id = id;
this.uri = uri;
this.date = date;
this.title = title;
}
@Override
public String toString() {
return "Entry [user=" + user + ", userUri=" + userUri + ", id=" + id + ", uri=" + uri + ", date=" + date
+ ", title=" + title + ", media="+getMedia().stream().map(s -> s+"; ").reduce("", String::concat)+"]";
}
public ArrayList<String> getMedia() {
return media;
}
public void setMedia(ArrayList<String> media) {
this.media = media;
}
}
基本上...您正在尝试解决一个不是问题的问题。公共 fork-join 池是一个托管池。 JVM 会处理它。
如果这对您来说真的很重要,那么坏消息是您可能对此无能为力。公共池将忽略 shutdown()
和 shutdownNow()
调用。这是设计使然。
有一个技巧可让您在其中创建自定义 ForkJoinPool
和 运行 流。参见 Custom thread pool in Java 8 parallel stream。完成后,您可以关闭池以使线程消失。
但是……这可能不是个好主意。重用现有池或公共池效率更高。创建和销毁线程是昂贵的。重复执行此操作是因为您重复创建和销毁池是低效的。
常见的ForkJoinPool
不应视为线程泄漏。
我正在写一个 rss 媒体采集器,我的一个 parallelStream()
s 导致线程未关闭。
它们都是 Daemon Thread [ForkJoinPool.commonPool-worker-xx]
个线程。
我能发现的唯一区别是这两个示例都是从不同的线程调用的。这可能是问题所在吗?即使当我尝试启动新的 Thread().start 时,parallelStream()
仍未关闭所有 ForkJoinPool
线程。而且我也尝试使用简单的 objects 像 ArrayList<int>
一样的结果。这是主线程外的通缉行为吗?
或者只是文档中描述的行为(粗体部分):
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist).
public class MainRoutine {
public static void startRoutine(SubReddit subReddit) {
ArrayList<Entry> rssEntry = RSSgrab.pullRss(new SubReddit("pics", true, null));
rssEntry.parallelStream().forEach(System.out::println); //produces unclosed threads
System.out.println(Thread.currentThread().getName()); //prints AWT-EventQueue-0
}
public static void main(String[] args) {
ArrayList<Entry> rssEntry = RSSgrab.pullRss(new SubReddit("pics", true, null));
rssEntry.parallelStream().forEach(System.out::println); //this does not produce unclosed threads
System.out.println(Thread.currentThread().getName()); //prints main
//my bad, does also produce unclosed threads but the runtime is so short that I did not notice ofc
}
}
条目class
public class Entry {
String user;
String userUri;
String id;
String uri;
String date;
String title;
private ArrayList<String> media = new ArrayList<String>();
public Entry(String user, String userUri, String id, String uri, String date, String title) {
this.user = user;
this.userUri = userUri;
this.id = id;
this.uri = uri;
this.date = date;
this.title = title;
}
@Override
public String toString() {
return "Entry [user=" + user + ", userUri=" + userUri + ", id=" + id + ", uri=" + uri + ", date=" + date
+ ", title=" + title + ", media="+getMedia().stream().map(s -> s+"; ").reduce("", String::concat)+"]";
}
public ArrayList<String> getMedia() {
return media;
}
public void setMedia(ArrayList<String> media) {
this.media = media;
}
}
基本上...您正在尝试解决一个不是问题的问题。公共 fork-join 池是一个托管池。 JVM 会处理它。
如果这对您来说真的很重要,那么坏消息是您可能对此无能为力。公共池将忽略 shutdown()
和 shutdownNow()
调用。这是设计使然。
有一个技巧可让您在其中创建自定义 ForkJoinPool
和 运行 流。参见 Custom thread pool in Java 8 parallel stream。完成后,您可以关闭池以使线程消失。
但是……这可能不是个好主意。重用现有池或公共池效率更高。创建和销毁线程是昂贵的。重复执行此操作是因为您重复创建和销毁池是低效的。
常见的ForkJoinPool
不应视为线程泄漏。