如何使用 stream parallel 和 filter 以及 findFirst shortcircuit

How use stream parallel and filter and findFirst shortcircuit

我有一个具有不同复杂性(处理持续时间)的 TxnType 列表。
我想从列表中找到匹配的 TxnType
我尝试通过混合流的并行处理和短路过滤功能来实现它。
但我注意到它们并没有混合在一起。
我写了下面的示例。但是注意到并联和短路的混合不能正常工作。
每个 运行 显示并行处理工作但在找到项目后立即找到时不会终止!!!

    class TxnType {
        public String id;   
        public TxnType(String id) {this.id = id;}
       
        public boolean match(String entry) {
            Date s = new Date();
            // simulate long processing match time TxnType
            if (id.equals("1")) {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Date f = new Date();
            System.out.println("check id = " + id+ "  duration = "+(f.getTime()- s.getTime()));
    
            return id.equalsIgnoreCase(entry);
        }
    }

     private void test4() {
        // build list of available TxnTypes
        ArrayList<TxnType> lst = new ArrayList<>();
        lst.add(new TxnType("0"));
        lst.add(new TxnType("1"));  // long match processing time type
        lst.add(new TxnType("2"));
        lst.add(new TxnType("3"));
        lst.add(new TxnType("4"));

        String searchFor = "3";
        System.out.println("searchFor = " + searchFor);
        Date st, fi;

        st = new Date();
        Optional<TxnType> found2 =lst.stream().parallel().filter(txnType->txnType.match(searchFor)).findFirst();
            System.out.println("found.stream().count() = " + found2.stream().count());
            fi= new Date();
            System.out.println("dur="+ (fi.getTime()- st.getTime()));
    }

通过运行ning多次,发现没有尽快终止处理等待全部处理!!!

searchFor = 3
check id = 4  duration = 0
check id = 2  duration = 0
check id = 3  duration = 0
check id = 0  duration = 0
check id = 1  duration = 4005
found.stream().count() = 1
dur=4050

有没有类似FilterFindFirst()的东西?

你的错误是使用 findFirst,而不是 findAny

请注意,1 排在您希望找到的元素 (3) 之前。因此它必须先完成检查 1,然后才能得出结论“3 与谓词匹配的第一个 元素”,即使它们是并行完成的.如果它找到 3,并且还没有开始检查列表下方的内容,那么它不会开始检查。这就是findFirst中短路的意思。

findAny 另一方面,不关心顺序。如果它找到 any 满足谓词的元素,它就不会再开始检查任何新内容。

现在,即使您更改为findAny,您可能仍然会发现需要 4 秒才能完成。这是因为与流管道可以创建的线程数相比,列表中的元素太少了。于是开始对所有元素的处理,一旦开始,就不会被打断,即使已经找到满足谓词的元素。

如果将更多元素放入列表中:

for (int i = 0 ; i < 100 ; i++) {
    lst.add(new TxnType("foo"));
}

...

Optional<TxnType> found2 = lst.parallelStream().filter(txnType -> txnType.match(searchFor)).findAny();

那么 1 的处理就不太可能在 3 的处理完成之前开始,你会得到更快的 运行。不过,这不会每次都发生。无法保证 1 不会在 3.

之前得到处理

基本上,短路工作正常。就是这样

  • findFirst 不会像您希望的那样剧烈短路
  • 你的列表元素太少,而你的计算机有足够的核心来一次处理所有元素,所以它确实如此。

如果您查看 FindOps.FindTask.doLeaf 的源代码,您会发现,首先执行操作,然后才检查是否找到结果。

假设有 4 个核心,列表中的一个元素将传递给每个线程。一旦一个线程完成对一个元素的执行,它就会变得空闲并选择下一个可用元素(如果有的话)。在您的情况下,很可能在找到匹配项之前选择了列表的第 5 个元素进行执行。

如果您尝试使用更长的列表,您可能会看到短路在起作用。

1 - 包含 5 个元素的列表太小了,可能每个线程只处理一个元素,最多 2

2 - 因为第二个元素花费的时间更多,它很可能会在任何其他检查完成之前开始,也就是说,在找到任何元素之前- match()方法一旦启动,不会被中断

尝试使用更多元素:50 甚至 1001.
将长 运行ning 元素更改为第 10 个或更晚的元素。
对于 fun 还要将 Thread.currentThread().getId() 添加到 match() 的输出中。

你会看到

  • 即使使用findAny()2[=57,长运行ning线程并不总是启动 =]
  • 并非所有元素都经过检查

3 - 请注意:看到 StreamDate 混合很奇怪(我更喜欢使用 java.time 类)


1 - 用 50 个元素测试了自己,"9" 是最慢的,搜索 "15"(或 "3")
2 - 在使用 findAny() 而不是 findFirst() 时我看不出任何相关差异(none 如果只使用 5 个元素)(与我的第一个猜测相反) )
两者的检查顺序都是随机的,有时 findAny() 运行 是慢线程,而 findFirst() 不是