离开 Spring 事务后从 JDBC Blob 中读取

Reading from a JDBC Blob after leaving the Spring transaction

我有以下 JAX-RS 服务端点的示意图实现:

@GET
@Path("...")
@Transactional
public Response download() {
    java.sql.Blob blob = findBlob(...);
    return Response.ok(blob.getBinaryStream()).build();
}

调用 JAX-RS 端点将从数据库中获取一个 Blob(通过 JPA)并将结果流式传输回 HTTP 客户端。使用 Blob 和流而不是例如JPA 的原始 BLOB 到 byte[] 映射是为了防止所有数据都必须保存在内存中,而是直接从数据库流式传输到 HTTP 响应。

这是按预期工作的,我实际上不明白为什么。我从与底层 JDBC 连接和事务关联的数据库中获取的 Blob 句柄不是吗?如果是这样,我会期望当我从 download() 方法 return 时提交 Spring 事务,使得 JAX-RS 实现以后不可能从 Blob 访问数据以流式传输它回到 HTTP 响应。

您确定交易通知是运行ning吗? By default、Spring使用"proxy"建议模式。如果您使用 JAX-RS Application 注册了资源的 Spring 代理实例,或者如果您使用 "aspectj" 编织而不是默认 "proxy" 建议模式。

假设 physical 交易没有被重新使用作为交易传播的结果,在这个 download() 方法上使用 @Transactional 通常是不正确的。

如果事务通知实际上是运行ning,则事务在从download() 方法返回时结束。 Blob Javadoc 说:"A Blob object is valid for the duration of the transaction in which is was created." 但是,JDBC 4.2 规范的第 16.3.7 节说:“BlobClobNClob 对象仍然存在至少在创建它们的交易期间有效。”因此,不保证 getBinaryStream() 返回的 InputStream 对提供响应有效;有效性将取决于 JDBC 驱动程序提供的任何保证。为了获得最大的可移植性,您应该依赖 Blob 仅在交易期间有效。

无论事务建议是否为 运行ning,您都可能存在竞争条件,因为用于检索 Blob 的基础 JDBC 连接可能会在使 Blob.

无效的方式

编辑: 测试 Jersey 2.17,从 InputStream 构建 Response 的行为似乎取决于指定的响应 MIME 类型。在某些情况下, InputStream 在发送响应之前首先被完全读入内存。在其他情况下,InputStream 流回。

这是我的测试用例:

@Path("test")
public class MyResource {

    @GET
    public Response getIt() {
        return Response.ok(new InputStream() {
            @Override
            public int read() throws IOException {
                return 97; // 'a'
            }
        }).build();
    }
}

如果 getIt() 方法带有 @Produces(MediaType.TEXT_PLAIN) 注释或没有 @Produces 注释,则 Jersey 会尝试将整个(无限)InputStream 读入内存并最终读入应用程序服务器由于 运行ning 内存不足而崩溃。如果 getIt() 方法用 @Produces(MediaType.APPLICATION_OCTET_STREAM) 注释,则响应流回。

因此,您的 download() 方法可能正常工作,因为 blob 没有 流回。 Jersey 可能正在将整个 blob 读入内存。

相关:How to stream an endless InputStream with JAX-RS

EDIT2: 我使用 Spring Boot 和 Apache CXF 创建了一个演示项目:
https://github.com/dtrebbien/so30356840-cxf

如果你运行项目并在命令行上执行:

curl 'http://localhost:8080/myapp/test/data/1' >/dev/null

然后你会看到如下日志输出:

2015-06-01 15:58:14.573 DEBUG 9362 --- [nio-8080-exec-1] org.apache.cxf.transport.http.Headers    : Request Headers: {Accept=[*/*], Content-Type=[null], host=[localhost:8080], user-agent=[curl/7.37.1]}

2015-06-01 15:58:14.584 DEBUG 9362 --- [nio-8080-exec-1] org.apache.cxf.jaxrs.utils.JAXRSUtils    : Trying to select a resource class, request path : /test/data/1
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] org.apache.cxf.jaxrs.utils.JAXRSUtils    : Trying to select a resource operation on the resource class com.sample.resource.MyResource
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] org.apache.cxf.jaxrs.utils.JAXRSUtils    : Resource operation getIt may get selected
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] org.apache.cxf.jaxrs.utils.JAXRSUtils    : Resource operation getIt on the resource class com.sample.resource.MyResource has been selected
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSInInterceptor   : Request path is: /test/data/1
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSInInterceptor   : Request HTTP method is: GET
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSInInterceptor   : Request contentType is: */*
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSInInterceptor   : Accept contentType is: */*
2015-06-01 15:58:14.585 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSInInterceptor   : Found operation: getIt

2015-06-01 15:58:14.595 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Creating new transaction with name [com.sample.resource.MyResource.getIt]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
2015-06-01 15:58:14.595 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Acquired Connection [ProxyConnection[PooledConnection[org.hsqldb.jdbc.JDBCConnection@7b191894]]] for JDBC transaction
2015-06-01 15:58:14.596 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Switching JDBC Connection [ProxyConnection[PooledConnection[org.hsqldb.jdbc.JDBCConnection@7b191894]]] to manual commit
2015-06-01 15:58:14.602 DEBUG 9362 --- [nio-8080-exec-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2015-06-01 15:58:14.603 DEBUG 9362 --- [nio-8080-exec-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [SELECT data FROM images WHERE id = ?]
2015-06-01 15:58:14.620 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Initiating transaction commit
2015-06-01 15:58:14.620 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Committing JDBC transaction on Connection [ProxyConnection[PooledConnection[org.hsqldb.jdbc.JDBCConnection@7b191894]]]
2015-06-01 15:58:14.621 DEBUG 9362 --- [nio-8080-exec-1] o.s.j.d.DataSourceTransactionManager     : Releasing JDBC Connection [ProxyConnection[PooledConnection[org.hsqldb.jdbc.JDBCConnection@7b191894]]] after transaction
2015-06-01 15:58:14.621 DEBUG 9362 --- [nio-8080-exec-1] o.s.jdbc.datasource.DataSourceUtils      : Returning JDBC Connection to DataSource
2015-06-01 15:58:14.621 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Invoking handleMessage on interceptor org.apache.cxf.interceptor.OutgoingChainInterceptor@7eaf4562

2015-06-01 15:58:14.622 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Adding interceptor org.apache.cxf.interceptor.MessageSenderInterceptor@20ffeb47 to phase prepare-send
2015-06-01 15:58:14.622 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Adding interceptor org.apache.cxf.jaxrs.interceptor.JAXRSOutInterceptor@5714d386 to phase marshal
2015-06-01 15:58:14.622 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Chain org.apache.cxf.phase.PhaseInterceptorChain@11ca802c was created. Current flow:
  prepare-send [MessageSenderInterceptor]
  marshal [JAXRSOutInterceptor]

2015-06-01 15:58:14.623 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Invoking handleMessage on interceptor org.apache.cxf.interceptor.MessageSenderInterceptor@20ffeb47
2015-06-01 15:58:14.623 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Adding interceptor org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor@6129236d to phase prepare-send-ending
2015-06-01 15:58:14.623 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Chain org.apache.cxf.phase.PhaseInterceptorChain@11ca802c was modified. Current flow:
  prepare-send [MessageSenderInterceptor]
  marshal [JAXRSOutInterceptor]
  prepare-send-ending [MessageSenderEndingInterceptor]

2015-06-01 15:58:14.623 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Invoking handleMessage on interceptor org.apache.cxf.jaxrs.interceptor.JAXRSOutInterceptor@5714d386
2015-06-01 15:58:14.627 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.j.interceptor.JAXRSOutInterceptor  : Response content type is: application/octet-stream
2015-06-01 15:58:14.631 DEBUG 9362 --- [nio-8080-exec-1] o.apache.cxf.ws.addressing.ContextUtils  : retrieving MAPs from context property javax.xml.ws.addressing.context.inbound
2015-06-01 15:58:14.631 DEBUG 9362 --- [nio-8080-exec-1] o.apache.cxf.ws.addressing.ContextUtils  : WS-Addressing - failed to retrieve Message Addressing Properties from context
2015-06-01 15:58:14.636 DEBUG 9362 --- [nio-8080-exec-1] o.a.cxf.phase.PhaseInterceptorChain      : Invoking handleMessage on interceptor org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor@6129236d
2015-06-01 15:58:14.639 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.t.http.AbstractHTTPDestination     : Finished servicing http request on thread: Thread[http-nio-8080-exec-1,5,main]
2015-06-01 15:58:14.639 DEBUG 9362 --- [nio-8080-exec-1] o.a.c.t.servlet.ServletController        : Finished servicing http request on thread: Thread[http-nio-8080-exec-1,5,main]

为了便于阅读,我已经修剪了日志输出。需要注意的重要一点是事务已提交并且 JDBC 连接在 返回之前 发送响应。因此,blob.getBinaryStream() 返回的 InputStream 不一定有效,getIt() resource method 可能调用未定义的行为。

EDIT3: 使用 Spring 的 @Transactional 注释的推荐做法是注释服务方法(参见 Spring @Transactional Annotation Best Practice). You could have a service method that finds the blob and transfers the blob data to the response OutputStream. The service method could be annotated with @Transactional so that the transaction in which the Blob is created would remain open for the duration of the transfer. However, it seems to me that this approach could introduce a denial of service vulnerability by way of a "slow read" attack。因为事务应在传输期间保持打开状态以实现最大的可移植性,许多速度较慢的读者可能会通过保持打开的事务来锁定您的数据库table(s)。

一种可能的方法是将 blob 保存到一个临时文件中,然后将该文件流回。参见 How do I use Java to read from a file that is actively being written? for some ideas on reading a file while it's being simultaneously written, though this case is more straightforward because the length of the blob can be determined by calling the Blob#length() 方法。

我现在已经花了一些时间调试代码,我在问题中的所有假设或多或少都是正确的。 @Transactional 注释按预期工作,事务(Spring 和数据库事务)在从下载方法返回后立即提交,物理数据库连接返回到连接池,BLOB 的内容是显然稍后被读取并流式传输到 HTTP 响应。

这仍然有效的原因是 Oracle JDBC 驱动程序实现了超出 JDBC 规范要求的功能。正如 Daniel 指出的那样,JDBC API 文档指出 "A Blob object is valid for the duration of the transaction in which is was created." 该文档仅说明 Blob 在交易期间有效,它确实 而不是 状态(如 Daniel 所声称,最初由我假设),Blob 在交易结束后 无效

使用普通 JDBC,从同一物理连接的两个不同事务中的两个 Blob 检索 InputStream 并且在事务提交之前不读取 Blob 数据证明了这种行为:

Connection conn = DriverManager.getConnection(...);
conn.setAutoCommit(false);

ResultSet rs = conn.createStatement().executeQuery("select data from ...");
rs.next();
InputStream is1 = rs.getBlob(1).getBinaryStream();
rs.close();
conn.commit();

rs = conn.createStatement().executeQuery("select data from ...");
rs.next();
InputStream is2 = rs.getBlob(1).getBinaryStream();
rs.close();
conn.commit();

int b1 = 0, b2 = 0;
while(is1.read()>=0) b1++;
while(is2.read()>=0) b2++;

System.out.println("Read " + b1 + " bytes from 1st blob");
System.out.println("Read " + b2 + " bytes from 2nd blob");

即使两个 Blob 都是从同一个物理连接和两个不同事务中选择的,它们都可以被完全读取。

关闭 JDBC 连接 (conn.close()) 但最终会使 Blob 流无效。

我有一个类似的相关问题,我可以确认至少在我的情况下,PostgreSQL 在使用 StreamingOutput 方法时抛出异常 Invalid large object descriptor : 0 with autocommit。这样做的原因是,当 JAX-RS 中的 Response 被 returned 时,事务被提交并且流方法稍后执行。与此同时,文件描述符不再有效。

我创建了一些辅助方法,以便流式传输部分打开一个新事务并可以流式传输 Blob。 com.foobar.model.Blob 只是一个 return class 封装了 blob,因此不必获取完整的实体。 findByID 是一种在 blob 列上使用投影并仅获取此列的方法。

因此 JAX-RS 的 StreamingOutput 和 JPA 下的 Blob 以及 Spring 事务正在运行,但必须对其进行调整。我想这同样适用于 JPA 和 EJB。

// NOTE: has to run inside a transaction to be able to stream from the DB
@Transactional
public void streamBlobToOutputStream(OutputStream outputStream, Class entityClass, String id, SingularAttribute attribute) {
    BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
    try {
        com.foobar.model.Blob blob = fooDao.findByID(id, entityClass, com.foobar.model.Blob.class, attribute);
        if (blob.getBlob() == null) {
            return;
        }
        InputStream inputStream;
        try {
            inputStream = blob.getBlob().getBinaryStream();
        } catch (SQLException e) {
            throw new RuntimeException("Could not read binary data.", e);
        }
        IOUtils.copy(inputStream, bufferedOutputStream);
        // NOTE: the buffer must be flushed without data seems to be missing
        bufferedOutputStream.flush();
    } catch (Exception e) {
        throw new RuntimeException("Could not send data.", e);
    }
}

/**
 * Builds streaming response for data which can be streamed from a Blob.
 *
 * @param contentType        The content type. If <code>null</code> application/octet-stream is used.
 * @param contentDisposition The content disposition. E.g. naming of the file download. Optional.
 * @param entityClass        The entity class to search in.
 * @param id                 The Id of the entity with the blob field to stream.
 * @param attribute          The Blob attribute in the entity.
 * @return the response builder.
 */
protected Response.ResponseBuilder buildStreamingResponseBuilder(String contentType, String contentDisposition,
                                                                 Class entityClass, String id, SingularAttribute attribute) {
    StreamingOutput streamingOutput = new StreamingOutput() {

        @Override
        public void write(OutputStream output) throws IOException, WebApplicationException {
            streamBlobToOutputStream(output, entityClass, id, attribute);
        }
    };
    MediaType mediaType = MediaType.APPLICATION_OCTET_STREAM_TYPE;
    if (contentType != null) {
        mediaType = MediaType.valueOf(contentType);
    }
    Response.ResponseBuilder response = Response.ok(streamingOutput, mediaType);
    if (contentDisposition != null) {
        response.header("Content-Disposition", contentDisposition);
    }
    return response;
}

/**
 * Stream a blob from the database.
 * @param contentType        The content type. If <code>null</code> application/octet-stream is used.
 * @param contentDisposition The content disposition. E.g. naming of the file download. Optional.
 * @param currentBlob The current blob value of the entity.
 * @param entityClass The entity class to search in.
 * @param id          The Id of the entity with the blob field to stream.
 * @param attribute   The Blob attribute in the entity.
 * @return the response.
 */
@Transactional
public Response streamBlob(String contentType, String contentDisposition,
                           Blob currentBlob, Class entityClass, String id, SingularAttribute attribute) {
    if (currentBlob == null) {
        return Response.noContent().build();
    }
    return buildStreamingResponseBuilder(contentType, contentDisposition, entityClass, id, attribute).build();
}

我还必须在我的回答中补充一点,Hibernate 下的 Blob 行为可能存在问题。默认情况下,Hibernate 将整个实体与 DB 合并,即使只有一个字段被更改,即如果您更新一个字段 name 并且还有一个大的 Blob image 未触及,图像将被更新。更糟糕的是,因为在合并之前,如果实体是分离的,Hibernate 必须从数据库中获取 Blob 以确定 dirty 状态。因为 blob 不能按字节进行比较(太大),所以它们被认为是不可变的,相等比较仅基于 blob 的对象引用。从数据库中获取的对象引用将是一个不同的对象引用,因此尽管没有任何更改,但 blob 会再次更新。至少这就是我的情况。我在实体上使用了注解 @DynamicUpdate 并编写了一个用户类型以不同的方式处理 blob 并检查是否必须更新。