使用 jOOQ 从 Postgres 获取的流不从 class 返回结果
Stream fetched from Postgres with jOOQ not returning results from class
问题
我正在尝试 stream
来自 postgres
查询的结果到前端应用程序,而不是急切地获取所有结果。问题是我只能在我的终端中看到流式结果(即首先在 "org.jooq.tools.LoggerListener : Record fetched: ..."
中然后使用 stream.get().forEach(s -> debug)
),而引用此流的 class 仅产生 null
值时被要求在前端查看 ResultSet
。
此数据也可用于其他任务(例如可视化、下载/导出、汇总统计等)。我一直在查看有关 jOOQ
的文档和帖子,我将其用作我的 ORM,并且我正在尝试使用以下方法:
Eagerly fetching 现在可以完美地工作,但这将 return 一个巨人中的所有内容 ResponseEntity
并且不会流式传输结果:
当前 classes
DataController.java
@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {
@Autowired private QueryService queryService;
@PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ApiOperation(value = "Query the data")
@ResponseStatus(HttpStatus.CREATED)
public ResponseEntity<QueryResult> getQueryResults(
@RequestBody @ValidQuery Query query, HttpServletRequest request) {
QueryResult res = queryService.search(query);
return ResponseEntity.ok(res);
}
// ...
}
QueryResult.java
public QueryResult(Stream<Record> result) {
this.result = result;
}
// public List<Map<String, Object>> getResult() { return result; }
@JsonProperty("result")
public Stream<Record> getResult() { return result; }
// public void setResult(List<Map<String, Object>> result) { this.result = result; }
public void setResult(Stream<Record> result) { this.result = result; }
}
QueryService.java
@Service
public class QueryService implements SearchService{
@Autowired DefaultDSLContext dslContext;
public QueryResult search(Query query) {
LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();
// Build selected fields
List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);
// Current support is for a single query. All others passed will be ignored
List<Filter> filters = query.getFilters();
Filter leadingFilter = QueryUtils.getLeadingFilter(filters);
// Build "where" conditions
Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);
// Get "from" statement
Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());
/*
// Works fine, but is not lazy fetching
List<Map<String, Object>> results =
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchMaps();
*/
// Appears to work only once.
// Cannot see any results returned, but the number of records is correct.
// Everything in the records is null / undefined in the frontend
Supplier<Stream<Record>> results = () ->
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchStream();
// "stream has already been operated upon or closed" is returned when using a Supplier
results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));
return new QueryResult(results.get());
}
}
Query.java
public class Query {
@NotNull(message = "Query must contain selection(s)")
private LinkedHashMap<DataSourceName, List<String>> selections;
private List<Filter> filters;
private List<Join> joins;
private List<Sort> sorts;
private long offset;
private int limit;
private QueryOptions options;
@JsonProperty("selections")
public LinkedHashMap<DataSourceName, List<String>> getSelections() {
return selections;
}
public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
this.selections = selections;
}
@JsonProperty("filters")
public List<Filter> getFilters() {
return filters;
}
public void setFilters(List<Filter> filters) {
this.filters = filters;
}
@JsonProperty("joins")
public List<Join> getJoins() {
return joins;
}
public void setJoins(List<Join> joins) {
this.joins = joins;
}
@JsonProperty("sorts")
public List<Sort> getSorts() {
return sorts;
}
public void setSorts(List<Sort> sorts) {
this.sorts = sorts;
}
@JsonProperty("options")
public QueryOptions getOptions() {
return options;
}
public void setOptions(QueryOptions options) {
this.options = options;
}
@JsonProperty("offset")
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
@JsonProperty("limit")
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
@Override
public String toString() {
return "Query{"
+ "selections=" + selections
+ ", filters=" + filters
+ ", sorts=" + sorts
+ ", offSet=" + offset
+ ", limit=" + limit
+ ", options=" + options
+ '}';
}
}
DataApi.js
// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;
Data.jsx
// ...
// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...
控制台返回结果
{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...
堆栈:
- Docker : 19.03.5
- Spring 启动 : v2.1.8.RELEASE
- 节点:v12.13.1
- 反应:16.9.0
- OpenJDK : 12.0.2
- jOOQ : 3.12.3
- postgres : 10.7
Java Stream
API 的全部要点是这样的流最多被消耗一次。它没有任何缓冲功能,也不支持像反应流实现那样的基于推送的流模型。
您可以将另一个 API 添加到您的堆栈中,例如Reactor
(还有其他的,但由于您已经在使用 Spring...),它支持向多个消费者缓冲和重播流,但这与 jOOQ 没有直接关系,并且会产生重大影响您的应用程序架构。
请注意,jOOQ 的 ResultQuery
扩展了 org.reactivestreams.Publisher
和 JDK 9 的 Flow.Publisher
,以便与此类反应流更好地互操作。
问题
我正在尝试 stream
来自 postgres
查询的结果到前端应用程序,而不是急切地获取所有结果。问题是我只能在我的终端中看到流式结果(即首先在 "org.jooq.tools.LoggerListener : Record fetched: ..."
中然后使用 stream.get().forEach(s -> debug)
),而引用此流的 class 仅产生 null
值时被要求在前端查看 ResultSet
。
此数据也可用于其他任务(例如可视化、下载/导出、汇总统计等)。我一直在查看有关 jOOQ
的文档和帖子,我将其用作我的 ORM,并且我正在尝试使用以下方法:
Eagerly fetching 现在可以完美地工作,但这将 return 一个巨人中的所有内容 ResponseEntity
并且不会流式传输结果:
当前 classes
DataController.java
@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {
@Autowired private QueryService queryService;
@PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ApiOperation(value = "Query the data")
@ResponseStatus(HttpStatus.CREATED)
public ResponseEntity<QueryResult> getQueryResults(
@RequestBody @ValidQuery Query query, HttpServletRequest request) {
QueryResult res = queryService.search(query);
return ResponseEntity.ok(res);
}
// ...
}
QueryResult.java
public QueryResult(Stream<Record> result) {
this.result = result;
}
// public List<Map<String, Object>> getResult() { return result; }
@JsonProperty("result")
public Stream<Record> getResult() { return result; }
// public void setResult(List<Map<String, Object>> result) { this.result = result; }
public void setResult(Stream<Record> result) { this.result = result; }
}
QueryService.java
@Service
public class QueryService implements SearchService{
@Autowired DefaultDSLContext dslContext;
public QueryResult search(Query query) {
LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();
// Build selected fields
List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);
// Current support is for a single query. All others passed will be ignored
List<Filter> filters = query.getFilters();
Filter leadingFilter = QueryUtils.getLeadingFilter(filters);
// Build "where" conditions
Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);
// Get "from" statement
Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());
/*
// Works fine, but is not lazy fetching
List<Map<String, Object>> results =
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchMaps();
*/
// Appears to work only once.
// Cannot see any results returned, but the number of records is correct.
// Everything in the records is null / undefined in the frontend
Supplier<Stream<Record>> results = () ->
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchStream();
// "stream has already been operated upon or closed" is returned when using a Supplier
results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));
return new QueryResult(results.get());
}
}
Query.java
public class Query {
@NotNull(message = "Query must contain selection(s)")
private LinkedHashMap<DataSourceName, List<String>> selections;
private List<Filter> filters;
private List<Join> joins;
private List<Sort> sorts;
private long offset;
private int limit;
private QueryOptions options;
@JsonProperty("selections")
public LinkedHashMap<DataSourceName, List<String>> getSelections() {
return selections;
}
public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
this.selections = selections;
}
@JsonProperty("filters")
public List<Filter> getFilters() {
return filters;
}
public void setFilters(List<Filter> filters) {
this.filters = filters;
}
@JsonProperty("joins")
public List<Join> getJoins() {
return joins;
}
public void setJoins(List<Join> joins) {
this.joins = joins;
}
@JsonProperty("sorts")
public List<Sort> getSorts() {
return sorts;
}
public void setSorts(List<Sort> sorts) {
this.sorts = sorts;
}
@JsonProperty("options")
public QueryOptions getOptions() {
return options;
}
public void setOptions(QueryOptions options) {
this.options = options;
}
@JsonProperty("offset")
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
@JsonProperty("limit")
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
@Override
public String toString() {
return "Query{"
+ "selections=" + selections
+ ", filters=" + filters
+ ", sorts=" + sorts
+ ", offSet=" + offset
+ ", limit=" + limit
+ ", options=" + options
+ '}';
}
}
DataApi.js
// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;
Data.jsx
// ...
// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...
控制台返回结果
{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...
堆栈:
- Docker : 19.03.5
- Spring 启动 : v2.1.8.RELEASE
- 节点:v12.13.1
- 反应:16.9.0
- OpenJDK : 12.0.2
- jOOQ : 3.12.3
- postgres : 10.7
Java Stream
API 的全部要点是这样的流最多被消耗一次。它没有任何缓冲功能,也不支持像反应流实现那样的基于推送的流模型。
您可以将另一个 API 添加到您的堆栈中,例如Reactor
(还有其他的,但由于您已经在使用 Spring...),它支持向多个消费者缓冲和重播流,但这与 jOOQ 没有直接关系,并且会产生重大影响您的应用程序架构。
请注意,jOOQ 的 ResultQuery
扩展了 org.reactivestreams.Publisher
和 JDK 9 的 Flow.Publisher
,以便与此类反应流更好地互操作。