如何将 java.util.stream.Stream 写入 StreamingResponseBody 输出流
How to write java.util.stream.Stream to StreamingResponseBody output stream
我正在努力构建一个 REST API,其中来自 Oracle 数据库的大量数据可以通过流式传输到客户端应用程序(如文件下载或直接流)以块的形式发送。
我正在从 JpaRepository 获取 Stream,如下所示 -
@Query("select u from UsersEntity u")
Stream<UsersEntity> findAllByCustomQueryAndStream();
但现在挑战是将此流写入 StreamingResponseBody 输出流
试了很多方法都没有成功-
第一种方法 -
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
StreamingResponseBody stream = outputStream -> {
Iterator<UsersEntity> iterator = usersResultStream.iterator();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
while (iterator.hasNext()) {
oos.write(iterator.next().toString().getBytes());
}
}
};
出现错误 -
java.sql.SQLException: Closed Resultset: next
at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:565) ~[ojdbc7-12.1.0.2.jar:12.1.0.2.0]
第二种方法 -
StreamingResponseBody stream = new StreamingResponseBody() {
@Transactional(readOnly = true)
@Override
public void writeTo(OutputStream outputStream) throws IOException {
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(user->{
try {
oos.write(user.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
};
出现错误 -
org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
我已经在下方上传了练习代码 link -
Sample POC Link
我没有任何流媒体相关任务的经验所以请帮助我。
如果我的方向错误,建议在 Spring 框架 中使用任何其他方法来做到这一点。请分享任何参考 links(如果有)。
没有示例显示 "such complex" StreamingResponseBody
的用法,我担心它是 "not possible"(至少我不能 manage/fix 它,使用 StreamingResponseBody and流查询)
...但是,什么是可能的:
在 StreamingResponseBody 中使用 findAll()
(正常的非流式 List-repo 方法)。
(但我理解 "need" 异步执行 Web 请求...和数据库请求 "streamed"...)
使用Callable
(异步网络请求)和一个@Async CompletableFuture<..>
(异步数据库请求):
@RestController
@RequestMapping("/api")
public class UsersController {
@Autowired
private UsersRepository usersRepository;
@GetMapping(value = "/async/users")
public Callable<List<UsersEntity>> fetchUsersAsync() {
Callable callable = () -> {
return usersRepository.readAllBy().get();
};
return callable;
}
}
..和存储库,如:
@Repository
public interface UsersRepository extends JpaRepository<UsersEntity, Integer> {
@Async
CompletableFuture<List<UsersEntity>> readAllBy();
}
(参见 spring-samples)
.. 别忘了 @EnableAsync
你的 application/configuration:
@org.springframework.scheduling.annotation.EnableAsync
@SpringBootApplication
public class Application { ... }
抱歉,这甚至不是答案,而是我的发现 - 太长了无法发表评论。
异步网络请求可以通过多种方式实现。 (见https://spring.io/blog/2012/05/10/spring-mvc-3-2-preview-making-a-controller-method-asynchronous/, https://niels.nu/blog/2016/spring-async-rest.html,甚至没有提到"reactive"api)
最后,我通过服务层解决了这个问题。最初,我在 Controller Class 中编写了完整的逻辑,这导致了问题。
控制器Class-
@RestController
@RequestMapping("/api")
public class UsersController {
@Autowired
private UserService service;
@GetMapping(value = "/userstream")
public ResponseEntity<StreamingResponseBody> fetchUsersStream() {
StreamingResponseBody stream = this::writeTo;
return new ResponseEntity<>(stream, HttpStatus.OK);
}
private void writeTo(OutputStream outputStream) {
service.writeToOutputStream(outputStream);
}
}
服务Class-
@Service
public class UserService {
@Autowired
private UsersRepository usersRepository;
@Transactional(readOnly = true)
public void writeToOutputStream(final OutputStream outputStream) {
try (Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(emp -> {
try {
oos.write(emp.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
完整代码可在 github - https://github.com/bagesh2050/HttpResponseStreamingDemo
获得
不过,我愿意提供与 Http Streaming 相关的建议。如果您有更好的想法请提供。
我正在努力构建一个 REST API,其中来自 Oracle 数据库的大量数据可以通过流式传输到客户端应用程序(如文件下载或直接流)以块的形式发送。
我正在从 JpaRepository 获取 Stream,如下所示 -
@Query("select u from UsersEntity u")
Stream<UsersEntity> findAllByCustomQueryAndStream();
但现在挑战是将此流写入 StreamingResponseBody 输出流
试了很多方法都没有成功-
第一种方法 -
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
StreamingResponseBody stream = outputStream -> {
Iterator<UsersEntity> iterator = usersResultStream.iterator();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
while (iterator.hasNext()) {
oos.write(iterator.next().toString().getBytes());
}
}
};
出现错误 -
java.sql.SQLException: Closed Resultset: next
at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:565) ~[ojdbc7-12.1.0.2.jar:12.1.0.2.0]
第二种方法 -
StreamingResponseBody stream = new StreamingResponseBody() {
@Transactional(readOnly = true)
@Override
public void writeTo(OutputStream outputStream) throws IOException {
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(user->{
try {
oos.write(user.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
};
出现错误 -
org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
我已经在下方上传了练习代码 link - Sample POC Link
我没有任何流媒体相关任务的经验所以请帮助我。
如果我的方向错误,建议在 Spring 框架 中使用任何其他方法来做到这一点。请分享任何参考 links(如果有)。
没有示例显示 "such complex" StreamingResponseBody
的用法,我担心它是 "not possible"(至少我不能 manage/fix 它,使用 StreamingResponseBody and流查询)
...但是,什么是可能的:
在 StreamingResponseBody 中使用
findAll()
(正常的非流式 List-repo 方法)。(但我理解 "need" 异步执行 Web 请求...和数据库请求 "streamed"...)
使用
Callable
(异步网络请求)和一个@Async CompletableFuture<..>
(异步数据库请求):@RestController @RequestMapping("/api") public class UsersController { @Autowired private UsersRepository usersRepository; @GetMapping(value = "/async/users") public Callable<List<UsersEntity>> fetchUsersAsync() { Callable callable = () -> { return usersRepository.readAllBy().get(); }; return callable; } }
..和存储库,如:
@Repository public interface UsersRepository extends JpaRepository<UsersEntity, Integer> { @Async CompletableFuture<List<UsersEntity>> readAllBy(); }
(参见 spring-samples) .. 别忘了
@EnableAsync
你的 application/configuration:@org.springframework.scheduling.annotation.EnableAsync @SpringBootApplication public class Application { ... }
抱歉,这甚至不是答案,而是我的发现 - 太长了无法发表评论。
异步网络请求可以通过多种方式实现。 (见https://spring.io/blog/2012/05/10/spring-mvc-3-2-preview-making-a-controller-method-asynchronous/, https://niels.nu/blog/2016/spring-async-rest.html,甚至没有提到"reactive"api)
最后,我通过服务层解决了这个问题。最初,我在 Controller Class 中编写了完整的逻辑,这导致了问题。
控制器Class-
@RestController
@RequestMapping("/api")
public class UsersController {
@Autowired
private UserService service;
@GetMapping(value = "/userstream")
public ResponseEntity<StreamingResponseBody> fetchUsersStream() {
StreamingResponseBody stream = this::writeTo;
return new ResponseEntity<>(stream, HttpStatus.OK);
}
private void writeTo(OutputStream outputStream) {
service.writeToOutputStream(outputStream);
}
}
服务Class-
@Service
public class UserService {
@Autowired
private UsersRepository usersRepository;
@Transactional(readOnly = true)
public void writeToOutputStream(final OutputStream outputStream) {
try (Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(emp -> {
try {
oos.write(emp.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
完整代码可在 github - https://github.com/bagesh2050/HttpResponseStreamingDemo
获得不过,我愿意提供与 Http Streaming 相关的建议。如果您有更好的想法请提供。