Dropwizard 中的 JAX-RS:通过即时响应处理异步调用

JAX-RS in Dropwizard: Handling async call with immediate response

我有一个资源 Class,它有一个 @ManagedAsync 方法 Class,它看起来像这样:

@Path("my-resource")
public class MyResource extends BaseResource{

    private DatumDAO datumDAO;

    public MyResource(DatumDAO datumDAO){
        this.datumDAO = datumDAO;
    }

    public void cleanDatum(Datum datum){
       //time taking operations
    }

    @GET
    @ManagedAsync
    @Path("/cleanup/{from}/{till}/")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    @UnitOfWork
    public void cleanupDirtyData(@Suspended final AsyncResponse asyncResponse, @PathParam("from") DateTimeParam from,
            @PathParam("till") DateTimeParam till) throws IOException{

        logger.debug("will try to cleanup dirty data in range: " + from + " " +  till);
        List<Datum> data = datumDAO.getALlDirtyDatumInRange(from.get().toDate(), till.get().toDate());
        Map<Long,String> cleanupMap = new HashMap<Long,String>();
        for(Datum datum: data){
            cleanDatum(datum);
            cleanupMap.put(datum.getId(), "cleaned");
        }
        // this response need to be sent [can be ignored]       
        asyncResponse.resume(Response.status(HttpStatus.OK_200).entity(cleanupMap).build());

    }

}

由于调用 cleanupDirtyData 可能需要一段时间,我不希望客户完全等待它,我知道执行工作被卸载到不同的工作线程。

我想要实现的是立即响应客户端并继续异步执行函数cleanupDirtyData

所以尝试了以下操作:

强制超时,并过早响应客户端,但这似乎不是理想的方式,它会停止执行。

看起来像这样:

@Path("my-resource")
public class MyResource extends BaseResource{

    private DatumDAO datumDAO;

    public MyResource(DatumDAO datumDAO){
        this.datumDAO = datumDAO;
    }

    public void cleanDatum(Datum datum){
       //time taking operations
    }

    @GET
    @ManagedAsync
    @Path("/cleanup/{from}/{till}/")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    @UnitOfWork
    public void cleanupDirtyData(@Suspended final AsyncResponse asyncResponse, @PathParam("from") DateTimeParam from,
            @PathParam("till") DateTimeParam till) throws IOException{

        // Register handler and set timeout
        asyncResponse.setTimeoutHandler(new TimeoutHandler() {
            public void handleTimeout(AsyncResponse ar) {
                asyncResponse.resume(Response.status(SERVICE_UNAVAILABLE).entity(
                    "Operation timed out -- please try again").build());                    
                }
        });
        ar.setTimeout(15, TimeUnit.SECONDS);      

        logger.debug("will try to cleanup dirty data in range: " + from + " " +  till);
        List<Datum> data = datumDAO.getALlDirtyDatumInRange(from.get().toDate(), till.get().toDate());
        Map<Long,String> cleanupMap = new HashMap<Long,String>();
        for(Datum datum: data){
            cleanDatum(datum);
            cleanupMap.put(datum.getId(), "cleaned");
        }
       // this response need to be sent [can be ignored]              
        asyncResponse.resume(Response.status(HttpStatus.OK_200).entity(cleanupMap).build());

    }

}

JAX-RS 异步服务器 API 是关于 容器 如何管理请求的。但它仍然会保留请求,不会影响客户端体验。

引用关于 Asynchronous Server API 的 Jersey 文档:

Note that the use of server-side asynchronous processing model will not improve the request processing time perceived by the client. It will however increase the throughput of the server, by releasing the initial request processing thread back to the I/O container while the request may still be waiting in a queue for processing or the processing may still be running on another dedicated thread. The released I/O container thread can be used to accept and process new incoming request connections.

如果您想立即给客户答复,您可能正在寻找类似的东西:

@Singleton
@Path("expensive-task")
public class ExpensiveTaskResource {

    private ExecutorService executor;

    private Future<String> futureResult;

    @PostConstruct
    public void onCreate() {
        this.executor = Executors.newSingleThreadExecutor();
    }

    @POST
    public Response startTask() {
        futureResult = executor.submit(new ExpensiveTask());
        return Response.status(Status.ACCEPTED).build();
    }

    @GET
    public Response getResult() throws ExecutionException, InterruptedException {
        if (futureResult != null && futureResult.isDone()) {
            return Response.status(Status.OK).entity(futureResult.get()).build();
        } else {
            return Response.status(Status.FORBIDDEN).entity("Try later").build();
        }
    }

    @PreDestroy
    public void onDestroy() {
        this.executor.shutdownNow();
    }
}
public class ExpensiveTask implements Callable<String> {

    @Override
    public String call() throws Exception {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "Task completed";
    }
}

在 servlet 容器中,您可以使用 ExecutorService to run your expensive task. In a Java EE container, you should consider a ManagedExecutorService