在 ParallelFlux 中发生错误时回滚所有更改
rolling back all changes when errors happen in ParallelFlux
我正在将 spring webflux 与 reactor 一起使用,我有以下用于上传图像、调整图像大小和存储图像的 flux。对于每种尺寸,我在自定义执行程序服务上并行执行所描述的通量。
任何方法 createDbAttachmentEntity
、resizeAttachment
、storeFile
都可以抛出各种异常。
并行执行调整大小意味着涉及的任何线程都可能抛出异常。这意味着我需要回滚所有更改,以防某些内容未正确执行。
例如,我可能有 5 个尺寸,但在 DB 中,系统只添加了 4 个和 5 个。或者我可能在转换我的流时出错。或者我的存储系统有问题。
如果出现异常,我希望能够回滚所有更改(手动删除数据库条目并手动删除文件。
我该怎么做?
Flux.just(sizes)
.parallel()
.runOn(Schedulers.fromExecutor(executorService))
.map(size -> createDbAttachmentEntity(size))
.map(size_attachment -> resizeAttachment(size_attachment.getT1(), originalBytes))
.map(size_attachment_bytes -> storeFile(...))
.sequential()
.collectList()
.map(list -> {
if(list.size() != sizes.length
|| list.stream().anyMatch(objects -> objects.getT2().getId() == null)) {
throw new RuntimeException();
}
return list;
})
.flux()
here .onErrorReturn(.......deleteEntities...........deleteFiles...........) // problem: I do not have the files/entities
.flatMap(list -> Flux.fromStream(list.stream()))
.collectMap(Tuple2::getT1, Tuple2::getT2);
我正在考虑用这个解决它,但它不起作用
Flux.just(1, 2, 3, 4, 5, 6, 7)
.map(integer -> {
if (integer == 3) throw new RuntimeException("3");
return integer;
})
.flatMap(integer -> Flux.just(integer)
.onErrorResume(t -> {
System.out.println("--onErrorResume" + integer); // here is what I need to pass in
return Flux.empty();
}))
如果我正确理解要求,可以做类似的事情
自定义异常:
public class FluxEntitiesException extends RuntimeException {
private Set<Entity> entities;
public FluxEntitiesException() {
super();
}
public FluxEntitiesException(String s) {
super(s);
}
public FluxEntitiesException(String message, Throwable cause) {
super(message, cause);
}
public FluxEntitiesException(Throwable cause) {
super(cause);
}
public FluxEntitiesException(Set<Entity> entities) {
super();
this.entities = entities;
}
public Set<Entity> getEntities() {
return entities;
}
static final long serialVersionUID = -1848914673093119416L;
}
示例处理器:
public void processError(FluxEntitiesException e){
for(Entity entity: e.entities){
//DO SOMETHING TO ROLL BACK
System.out.println("Rolling back: " + entity);
}
}
示例实体:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Entity {
private Integer id;
//WHATEVER ELSE
}
回购方法示例:
public Entity addEntity(Entity entity) throws RuntimeException{
if(entity.getId() >4)
throw new RuntimeException("OH NO!");
System.out.println("Added entity" + entity);
return entity;
}
示例方法:
public void AddToDb(){
Set<Entity> entities = Collections.synchronizedSet(new HashSet<>());
Flux.just(1,2,3,4,5,6,7)
//Parallel stuff isn't needed was just an example since you use it in original
.parallel()
.runOn(Schedulers.boundedElastic())
.map(s -> addEntity(new Entity(s)))
.doOnNext(entities::add)
.sequential()
.doOnError(e -> processError(new FluxEntitiesException(entities)))
.collectList()
.subscribe(s -> System.out.println(entities));
}
我正在将 spring webflux 与 reactor 一起使用,我有以下用于上传图像、调整图像大小和存储图像的 flux。对于每种尺寸,我在自定义执行程序服务上并行执行所描述的通量。
任何方法 createDbAttachmentEntity
、resizeAttachment
、storeFile
都可以抛出各种异常。
并行执行调整大小意味着涉及的任何线程都可能抛出异常。这意味着我需要回滚所有更改,以防某些内容未正确执行。
例如,我可能有 5 个尺寸,但在 DB 中,系统只添加了 4 个和 5 个。或者我可能在转换我的流时出错。或者我的存储系统有问题。
如果出现异常,我希望能够回滚所有更改(手动删除数据库条目并手动删除文件。
我该怎么做?
Flux.just(sizes)
.parallel()
.runOn(Schedulers.fromExecutor(executorService))
.map(size -> createDbAttachmentEntity(size))
.map(size_attachment -> resizeAttachment(size_attachment.getT1(), originalBytes))
.map(size_attachment_bytes -> storeFile(...))
.sequential()
.collectList()
.map(list -> {
if(list.size() != sizes.length
|| list.stream().anyMatch(objects -> objects.getT2().getId() == null)) {
throw new RuntimeException();
}
return list;
})
.flux()
here .onErrorReturn(.......deleteEntities...........deleteFiles...........) // problem: I do not have the files/entities
.flatMap(list -> Flux.fromStream(list.stream()))
.collectMap(Tuple2::getT1, Tuple2::getT2);
我正在考虑用这个解决它,但它不起作用
Flux.just(1, 2, 3, 4, 5, 6, 7)
.map(integer -> {
if (integer == 3) throw new RuntimeException("3");
return integer;
})
.flatMap(integer -> Flux.just(integer)
.onErrorResume(t -> {
System.out.println("--onErrorResume" + integer); // here is what I need to pass in
return Flux.empty();
}))
如果我正确理解要求,可以做类似的事情
自定义异常:
public class FluxEntitiesException extends RuntimeException {
private Set<Entity> entities;
public FluxEntitiesException() {
super();
}
public FluxEntitiesException(String s) {
super(s);
}
public FluxEntitiesException(String message, Throwable cause) {
super(message, cause);
}
public FluxEntitiesException(Throwable cause) {
super(cause);
}
public FluxEntitiesException(Set<Entity> entities) {
super();
this.entities = entities;
}
public Set<Entity> getEntities() {
return entities;
}
static final long serialVersionUID = -1848914673093119416L;
}
示例处理器:
public void processError(FluxEntitiesException e){
for(Entity entity: e.entities){
//DO SOMETHING TO ROLL BACK
System.out.println("Rolling back: " + entity);
}
}
示例实体:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Entity {
private Integer id;
//WHATEVER ELSE
}
回购方法示例:
public Entity addEntity(Entity entity) throws RuntimeException{
if(entity.getId() >4)
throw new RuntimeException("OH NO!");
System.out.println("Added entity" + entity);
return entity;
}
示例方法:
public void AddToDb(){
Set<Entity> entities = Collections.synchronizedSet(new HashSet<>());
Flux.just(1,2,3,4,5,6,7)
//Parallel stuff isn't needed was just an example since you use it in original
.parallel()
.runOn(Schedulers.boundedElastic())
.map(s -> addEntity(new Entity(s)))
.doOnNext(entities::add)
.sequential()
.doOnError(e -> processError(new FluxEntitiesException(entities)))
.collectList()
.subscribe(s -> System.out.println(entities));
}