使用 jOOQ 从 Postgres 获取的流不从 class 返回结果

Stream fetched from Postgres with jOOQ not returning results from class

问题

我正在尝试 stream 来自 postgres 查询的结果到前端应用程序,而不是急切地获取所有结果。问题是我只能在我的终端中看到流式结果(即首先在 "org.jooq.tools.LoggerListener : Record fetched: ..." 中然后使用 stream.get().forEach(s -> debug)),而引用此流的 class 仅产生 null 值时被要求在前端查看 ResultSet

此数据也可用于其他任务(例如可视化、下载/导出、汇总统计等)。我一直在查看有关 jOOQ 的文档和帖子,我将其用作我的 ORM,并且我正在尝试使用以下方法:

Eagerly fetching 现在可以完美地工作,但这将 return 一个巨人中的所有内容 ResponseEntity 并且不会流式传输结果:


当前 classes

DataController.java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult.java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

QueryService.java

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Query.java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi.js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

Data.jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

控制台返回结果

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...

堆栈:

Java Stream API 的全部要点是这样的流最多被消耗一次。它没有任何缓冲功能,也不支持像反应流实现那样的基于推送的流模型。

您可以将另一个 API 添加到您的堆栈中,例如Reactor(还有其他的,但由于您已经在使用 Spring...),它支持向多个消费者缓冲和重播流,但这与 jOOQ 没有直接关系,并且会产生重大影响您的应用程序架构。

请注意,jOOQ 的 ResultQuery 扩展了 org.reactivestreams.Publisher 和 JDK 9 的 Flow.Publisher,以便与此类反应流更好地互操作。