使用 Java 8 流的 map 函数处理长 运行 任务

Use Java 8 stream's map function for long running tasks

我有一个方法将(远程文件的)URL 列表作为应该下载的参数。该方法 returns 一个其他类型的列表(称为附件),实际上包含一个 属性 的 java 文件类型。 对于这种情况,我使用 Java 流 API 遍历 URL 并在 "map" 函数中开始下载,然后 returns 附件实例。

现在我的问题是:我是否在滥用 Java 流 API 来做一些不该做的事情?比如把long 运行个任务放进去?我应该只对输入数据做一些小操作吗?

我现在看到的唯一缺点是它更难测试。

private List<Attachment> download(List<URL> attachments) {
        return attachments.stream().map(attachmentUrl -> {
            try {
                Attachment attachment = new Attachment();
                File attachmentFile = new File(getFilename(attachment.getAttachmentId(), attachmentUrl));
                FileUtils.copyURLToFile(
                        attachmentUrl,
                        attachmentFile,
                        CONNECT_TIMEOUT,
                        READ_TIMEOUT);
                attachment.setAttachmentFile(attachmentFile);
                return attachment;
            } catch (IOException e) {
                e.printStackTrace();
                LOGGER.error(e.getLocalizedMessage());
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

Streams 是以函数式编程方式处理数据的非常优雅的工具,相同的输入将产生相同的输出而没有副作用,这将使您的代码更不易出错且更具可读性。因此,无论输入的大小如何,在使用方面都没有滥用。如果您希望处理大量数据,则可以使用并行流。但是,您的实现可以使用一些清理,不要将所有业务逻辑委托给单个映射操作,使其更细化并将逻辑分散到多个映射器中,您可以像任何变量 Function<URL, File> urlToFileMapper = url -> {...} 一样声明映射器,并且将映射器插入流,attachments.stream().map(urlToFileMapper).map(anotherDeclaredMapper)...

我认为考虑 map 和其他功能结构(例如 filterreduce 等)可能会有所帮助,与其说是功能,不如说是 语法stream().map() 是执行 for 循环功能等效的语法。那么询问 "am I abusing this syntax because of what I use it to execute?" 就没那么有意义了:for 循环不关心它们 运行 在每次迭代中执行的任务需要多长时间,map 也不关心。它与它应用的 operation 无关,所以唯一的问题是你是否适当地使用了语法,即循环遍历集合,mapping 来自某些东西对某事。

在这种情况下,map 是语法,您想要的操作完全没问题。但是,您的 实现 可以稍微清理一下。

attachmentUrl -> {
    try {
        Attachment attachment = new Attachment();
        File attachmentFile = new File(getFilename(attachment.getAttachmentId(), attachmentUrl));
        FileUtils.copyURLToFile(
                attachmentUrl,
                attachmentFile,
                CONNECT_TIMEOUT,
                READ_TIMEOUT);
        attachment.setAttachmentFile(attachmentFile);
        return attachment;
    } catch (IOException e) {
        e.printStackTrace();
        LOGGER.error(e.getLocalizedMessage());
    }
    return null;
}

对于内联 map lambda 来说有点大。总的来说,我倾向于怀疑,但并不总是不赞成任何需要花括号的 map lambda,即占用不止一行。我建议将此 lambda 重构为一个命名函数,并且可能是一对嵌套函数(map(this::A),其中 A 然后调用 B)或由流操作连续使用 map(this::A).map(this::B).


[编辑:] 关于 stream 的并行化:请记住,作为此方法的一部分,您所做的不仅仅是 CPU 处理 - 您似乎在做网络 IO 文件IO。如果您并行执行,您将并行化的不仅是您的 CPU 利用率,还有您的网络和磁盘使用率。如果网络 磁盘是主导因素,而不是 CPU,那么并行化可能对您的帮助很小,并且可能使事情变得更糟。通常,更多线程 != 更快的网络或磁盘 read/write。您可能会发现 有用。