不能并行工作的自定义收集器

Custom Collector that cannot work in parallel

我制作了一个使用 MessageDigest 来创建散列的自定义收集器。一般来说,MessageDigest 不会并行工作。我看到的问题出在 combiner() 方法中。不可能组合两个 MessageDigest 对象。当我 return null 它似乎工作但如果我抛出 UnsupportedOperationException 它失败。实现不支持并行操作的收集器的典型方法是什么?

class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> {
    private String algorithm;

    ChecksumCollector(final String algorithm) {
        this.algorithm = algorithm;
    }

    @Override
    public Supplier<MessageDigest> supplier() {
        return () -> {
            try {
                return MessageDigest.getInstance(algorithm);
            } catch (NoSuchAlgorithmException e) {
                throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
            }
        };
    }

    @Override
    public BiConsumer<MessageDigest, String> accumulator() {
        return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public BinaryOperator<MessageDigest> combiner() {
        return null; //seems to work but hash may not be correct?
        //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");
    }

    @Override
    public Function<MessageDigest, ByteBuffer> finisher() {
        return md -> ByteBuffer.wrap(md.digest());
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Set.of();
    }
}

A CollectorBinaryOperator returned by combiner() 将仅在用于并行流时使用,但是 combiner() 方法本身将在 JDK 的实现中调用 Stream.collect() 检索该组合器时被调用(参见 ReduceOps.makeRef(Collector))。

因此您有 2 个选择:

  • 或者return null,这会导致NullPointerException如果你的收集器在并行流中使用,此时需要使用组合器;
  • 或 return 调用时实际抛出异常的 BinaryOperator
return (a, b) -> throw new UnsupportedOperationException(
    LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");

第二个选项对于后来更改您的管道以使其并行的不知情的开发人员更友好。

虽然并行使用 MessageDigest 没有意义,但是如果管道中有一些操作可以并行化(并且数据集有可能 足够大支付使用并行流的开销) 你可以考虑创建这个收集器的两个版本的选项。

中已经提供了针对您的顺序实施的补救措施。提供一个适当的二元运算符抛出异常比简单地返回 null.

要好得多

对于此收集器的并行版本,来自流的数据最初可以收集到 辅助可变容器 ,然后 finisher 函数将填充 MessageDigest 与容器中的数据。我想重申,只有 才有意义 如果 collect 由一些可以并行化的操作主持并且输入足够重要,否则它只会导致不必要的开销。

这就是并行收集器的样子

     Collector<String, ?, ByteBuffer> checksum =
                Collectors.collectingAndThen(Collectors.toList(), 
                                             list -> digestAndWrap(list, "SHA-512"));
    private static ByteBuffer digestAndWrap(List<String> list, String algorithm) {
        MessageDigest md;
        try {
            md = MessageDigest.getInstance(algorithm);
        } catch (NoSuchAlgorithmException e) {
            throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
        }
        for (String next: list) {
            md.update(next.getBytes(StandardCharsets.UTF_8));
        }
        return ByteBuffer.wrap(md.digest());
    }

如果你想完成这项工作,你需要通过间接减少函数而不是直接减少消息摘要来重新获得关联性。试试这个:

Collector<String, ?, ByteBuffer> checksumCollector = collectingAndThen(
    reducing(
        Function.identity(),
        s -> md -> {
            md.update(s.getBytes(StandardCharsets.UTF_8));
            return md;
        },
        (BinaryOperator<Function<MessageDigest, MessageDigest>>) Function::andThen),
    endo -> ByteBuffer.wrap(endo.apply(getMessageDigest("SHA-256")).digest()));

其中 getMessageDigest() 是一个处理检查异常的小辅助方法:

static MessageDigest getMessageDigest(String algorithm) {
    try {
        return MessageDigest.getInstance(algorithm);
    } catch (NoSuchAlgorithmException e) {
        throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
    }    
}

这实际上将消息摘要的实际计算推迟到整理器。