下一个订阅者不包含完整的项目

Subscribers onnext does not contain complete item

我们正在与 project reactor 合作,现在遇到了一个大问题。这就是我们生产(发布我们的数据)的方式:

public Flux<String> getAllFlux() {
    return Flux.<String>create(sink -> {
        new Thread(){
            public void run(){
                Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
                ObjectMapper mapper = new ObjectMapper();

                while(iterator.hasNext()) {
                    try {
                        sink.next(mapper.writeValueAsString(iterator.next().getValue()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                sink.complete();
            }
        } .start();
    });
}

如您所见,我们正在从迭代器中获取数据,并将该迭代器中的每个项目作为 json 字符串发布。我们的订户执行以下操作:

flux.subscribe(new Subscriber<String>() {
    private Subscription s;

    int amount = 1; // the amount of received flux payload at a time
    int onNextAmount;

    String completeItem="";

    ObjectMapper mapper = new ObjectMapper();

    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("subscribe");

        this.s = s;
        this.s.request(amount);
    }

    @Override
    public void onNext(String item) {
        MyObject myObject = null;

        try {
            System.out.println(item);

            myObject = mapper.readValue(completeItem, MyObject.class);

            System.out.println(myObject.toString());
        } catch (IOException e) {
            System.out.println(item);
            System.out.println("failed: " + e.getLocalizedMessage());
        }

        onNextAmount++;

        if (onNextAmount % amount == 0) {
            this.s.request(amount);
        }
    }

    @Override
    public void onError(Throwable t) {
        System.out.println(t.getLocalizedMessage())
    }

    @Override
    public void onComplete() {
        System.out.println("completed");
    });
}

如您所见,我们只是简单地打印我们收到的字符串项,并使用 jackson 包装器将其解析为一个对象。我们现在遇到的问题是,对于我们的大多数项目,一切正常:

{"itemId": "someId", "itemDesc", "some description"}

但对于某些项目,字符串会像这样被截断,例如:

{"itemId": "some"

之后的下一项是

"Id", "itemDesc", "some description"}

这些剪裁没有模式。它是完全随机的,每次我们 运行 编码时都不一样。当然,我们的 jackson 因该行为而出错 Unexpected end of Input

那么是什么导致了这种行为,我们该如何解决呢?

解决方案:

在 flux 中发送对象而不是字符串:

public Flux<ItemIgnite> getAllFlux() {
   return Flux.create(sink -> {
       new Thread(){
           public void run(){
               Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator();

               while(iterator.hasNext()) {
                   sink.next(iterator.next().getValue());
               }
           }
       } .start();
   });
} 

并使用以下 produces 类型:

@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json")

这里的关键是使用 stream+json 而不仅仅是 json