下一个订阅者不包含完整的项目
Subscribers onnext does not contain complete item
我们正在与 project reactor
合作,现在遇到了一个大问题。这就是我们生产(发布我们的数据)的方式:
public Flux<String> getAllFlux() {
return Flux.<String>create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
ObjectMapper mapper = new ObjectMapper();
while(iterator.hasNext()) {
try {
sink.next(mapper.writeValueAsString(iterator.next().getValue()));
} catch (IOException e) {
e.printStackTrace();
}
}
sink.complete();
}
} .start();
});
}
如您所见,我们正在从迭代器中获取数据,并将该迭代器中的每个项目作为 json 字符串发布。我们的订户执行以下操作:
flux.subscribe(new Subscriber<String>() {
private Subscription s;
int amount = 1; // the amount of received flux payload at a time
int onNextAmount;
String completeItem="";
ObjectMapper mapper = new ObjectMapper();
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe");
this.s = s;
this.s.request(amount);
}
@Override
public void onNext(String item) {
MyObject myObject = null;
try {
System.out.println(item);
myObject = mapper.readValue(completeItem, MyObject.class);
System.out.println(myObject.toString());
} catch (IOException e) {
System.out.println(item);
System.out.println("failed: " + e.getLocalizedMessage());
}
onNextAmount++;
if (onNextAmount % amount == 0) {
this.s.request(amount);
}
}
@Override
public void onError(Throwable t) {
System.out.println(t.getLocalizedMessage())
}
@Override
public void onComplete() {
System.out.println("completed");
});
}
如您所见,我们只是简单地打印我们收到的字符串项,并使用 jackson 包装器将其解析为一个对象。我们现在遇到的问题是,对于我们的大多数项目,一切正常:
{"itemId": "someId", "itemDesc", "some description"}
但对于某些项目,字符串会像这样被截断,例如:
{"itemId": "some"
之后的下一项是
"Id", "itemDesc", "some description"}
这些剪裁没有模式。它是完全随机的,每次我们 运行 编码时都不一样。当然,我们的 jackson 因该行为而出错 Unexpected end of Input
。
那么是什么导致了这种行为,我们该如何解决呢?
解决方案:
在 flux 中发送对象而不是字符串:
public Flux<ItemIgnite> getAllFlux() {
return Flux.create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator();
while(iterator.hasNext()) {
sink.next(iterator.next().getValue());
}
}
} .start();
});
}
并使用以下 produces
类型:
@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json")
这里的关键是使用 stream+json
而不仅仅是 json
。
我们正在与 project reactor
合作,现在遇到了一个大问题。这就是我们生产(发布我们的数据)的方式:
public Flux<String> getAllFlux() {
return Flux.<String>create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
ObjectMapper mapper = new ObjectMapper();
while(iterator.hasNext()) {
try {
sink.next(mapper.writeValueAsString(iterator.next().getValue()));
} catch (IOException e) {
e.printStackTrace();
}
}
sink.complete();
}
} .start();
});
}
如您所见,我们正在从迭代器中获取数据,并将该迭代器中的每个项目作为 json 字符串发布。我们的订户执行以下操作:
flux.subscribe(new Subscriber<String>() {
private Subscription s;
int amount = 1; // the amount of received flux payload at a time
int onNextAmount;
String completeItem="";
ObjectMapper mapper = new ObjectMapper();
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe");
this.s = s;
this.s.request(amount);
}
@Override
public void onNext(String item) {
MyObject myObject = null;
try {
System.out.println(item);
myObject = mapper.readValue(completeItem, MyObject.class);
System.out.println(myObject.toString());
} catch (IOException e) {
System.out.println(item);
System.out.println("failed: " + e.getLocalizedMessage());
}
onNextAmount++;
if (onNextAmount % amount == 0) {
this.s.request(amount);
}
}
@Override
public void onError(Throwable t) {
System.out.println(t.getLocalizedMessage())
}
@Override
public void onComplete() {
System.out.println("completed");
});
}
如您所见,我们只是简单地打印我们收到的字符串项,并使用 jackson 包装器将其解析为一个对象。我们现在遇到的问题是,对于我们的大多数项目,一切正常:
{"itemId": "someId", "itemDesc", "some description"}
但对于某些项目,字符串会像这样被截断,例如:
{"itemId": "some"
之后的下一项是
"Id", "itemDesc", "some description"}
这些剪裁没有模式。它是完全随机的,每次我们 运行 编码时都不一样。当然,我们的 jackson 因该行为而出错 Unexpected end of Input
。
那么是什么导致了这种行为,我们该如何解决呢?
解决方案:
在 flux 中发送对象而不是字符串:
public Flux<ItemIgnite> getAllFlux() {
return Flux.create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator();
while(iterator.hasNext()) {
sink.next(iterator.next().getValue());
}
}
} .start();
});
}
并使用以下 produces
类型:
@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json")
这里的关键是使用 stream+json
而不仅仅是 json
。