使用 R2DBC 等待多条语句执行

Wait for multiple statements to execute with R2DBC

我需要使用 R2DBC 执行多个语句,但找不到有用的 DatabaseClient#inConnection* 示例...所以我的函数总是太早出门:

public Publisher<Person> groupStatements(DatabaseClient client, Person person) {
// yes, I know that's harsh, but hey! so is JPA's @ElementCollection
return client.sql("DELETE FROM persons_address WHERE person = :id")
    .bind("id", person.getId())
    .fetch().rowsUpdated()
    .map(deleted -> {
        // now recreate every relationship
        GenericExecuteSpec statement = client.sql("INSERT INTO persons_address (person, address) VALUES (:person, :address)");
        person.getOfficePlaces().forEach(address -> {
            statement
                .bind("person", person.getId()).bind("address", address.getId())
                .fetch().rowsUpdated()  // there we go AWOL
                .subscribe(inserted -> {
                    // logging here
                });
        });
        return person;  //FIXME wait! need above grouped statements to complete
    });
}

注意:我使用 H2 作为后端。 感谢您提供任何信息!

我找到了一个合适的批处理技术(这里替换了 map/deleted 部分),但甚至卡住了,因为 Statement#execute 是 returning 一个只有 #subscribe 方法的发布者,我不能' t return 来自链。所以我用一些齿轮喂养了野兽

//DEBUG I couldn't figure out how to use labels! good enough
private static final String SQL_INSERT = "INSERT INTO persons_address (person, address) VALUES (, )";
...
.flatMap(deleted -> {
    if (person.getOfficePlaces().isEmpty()) {
        return Mono.just(person);
    } else {
        return client.inConnection(cnx -> {
            Statement stmt = cnx.createStatement(SQL_INSERT);
            person.getOfficePlaces().forEach(address -> {
                stmt.bind(0, person.getId()).bind(1, address.getId()).add();
            });
            return Flux.from(stmt.execute()).last().map(dontcare -> person);
        });
    }