如何为 Java 11 HttpRequest 创建自定义 BodyPublisher

How to create a custom BodyPublisher for Java 11 HttpRequest

我正在尝试创建一个自定义 BodyPublisher 来反序列化我的 JSON 对象。我可以在创建请求时反序列化 JSON 并使用 BodyPublishersofByteArray 方法,但我宁愿使用自定义发布者。

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

此实现有效,但前提是使用 1 作为参数调用订阅 request 方法。但这就是我将它与 HttpRequest 一起使用时发生的情况。

我很确定这不是创建自定义订阅的首选或最佳方式,但我还没有找到更好的方法来实现它。

如果有人能引导我走上更好的道路,我将不胜感激。

您避免从中创建字节数组是正确的,因为这会为大对象造成内存问题。

我不会尝试编写自定义发布器。相反,只需利用工厂方法 HttpRequest.BodyPublishers.ofInputStream.

HttpRequest.BodyPublisher publisher =
    HttpRequest.BodyPublishers.ofInputStream(() ->  {
        PipedInputStream in = new PipedInputStream();

        ForkJoinPool.commonPool().submit(() -> {
            try (PipedOutputStream out = new PipedOutputStream(in)) {
                objectMapper.writeTree(
                    objectMapper.getFactory().createGenerator(out),
                    jsonData);
            }
            return null;
        });

        return in;
    });

如您所见,您可以使用 HttpRequest.BodyPublishers.ofByteArray。这对于相对较小的对象来说很好,但我出于习惯为可伸缩性编程。假设代码不需要扩展的问题是其他开发人员会认为传递大对象是安全的,而没有意识到对性能的影响。

编写自己的 body publisher 需要大量工作。它的subscribe方法继承自Flow.Publisher.

subscribe 方法的文档以此开头:

Adds the given Subscriber if possible.

每次你的 subscribe 方法被调用时,你需要将订阅者添加到某种集合中,你需要创建一个 Flow.Subscription, and you need to immediately pass it to the subscriber’s onSubscribe method. Your Subscription implementation object needs to send back one or more ByteBuffers, only when the Subscription’s request method is called, by invoking the corresponding Subscriber’s (not just any Subscriber’s) onNext 方法的实现,一旦你发送了所有的数据,您必须调用同一订阅者的 onComplete() 方法。最重要的是,订阅实现对象需要处理 cancel 个请求。

您可以通过扩展 SubmissionPublisher,这是 Flow.Publisher 的默认实现,然后向其添加 contentLength() 方法,使这一切变得更容易。但正如 SubmissionPublisher 文档所示,即使是最小的工作实施,您仍有大量工作要做。

HttpRequest.BodyPublishers.of…方法将为您完成所有这些工作。 ofByteArray 适用于小对象,但 ofInputStream 适用于您可以传入的任何对象。