使用 R2DBC 等待多条语句执行
Wait for multiple statements to execute with R2DBC
我需要使用 R2DBC 执行多个语句,但找不到有用的 DatabaseClient#inConnection* 示例...所以我的函数总是太早出门:
public Publisher<Person> groupStatements(DatabaseClient client, Person person) {
// yes, I know that's harsh, but hey! so is JPA's @ElementCollection
return client.sql("DELETE FROM persons_address WHERE person = :id")
.bind("id", person.getId())
.fetch().rowsUpdated()
.map(deleted -> {
// now recreate every relationship
GenericExecuteSpec statement = client.sql("INSERT INTO persons_address (person, address) VALUES (:person, :address)");
person.getOfficePlaces().forEach(address -> {
statement
.bind("person", person.getId()).bind("address", address.getId())
.fetch().rowsUpdated() // there we go AWOL
.subscribe(inserted -> {
// logging here
});
});
return person; //FIXME wait! need above grouped statements to complete
});
}
注意:我使用 H2 作为后端。
感谢您提供任何信息!
我找到了一个合适的批处理技术(这里替换了 map/deleted 部分),但甚至卡住了,因为 Statement#execute 是 returning 一个只有 #subscribe 方法的发布者,我不能' t return 来自链。所以我用一些齿轮喂养了野兽
//DEBUG I couldn't figure out how to use labels! good enough
private static final String SQL_INSERT = "INSERT INTO persons_address (person, address) VALUES (, )";
...
.flatMap(deleted -> {
if (person.getOfficePlaces().isEmpty()) {
return Mono.just(person);
} else {
return client.inConnection(cnx -> {
Statement stmt = cnx.createStatement(SQL_INSERT);
person.getOfficePlaces().forEach(address -> {
stmt.bind(0, person.getId()).bind(1, address.getId()).add();
});
return Flux.from(stmt.execute()).last().map(dontcare -> person);
});
}
我需要使用 R2DBC 执行多个语句,但找不到有用的 DatabaseClient#inConnection* 示例...所以我的函数总是太早出门:
public Publisher<Person> groupStatements(DatabaseClient client, Person person) {
// yes, I know that's harsh, but hey! so is JPA's @ElementCollection
return client.sql("DELETE FROM persons_address WHERE person = :id")
.bind("id", person.getId())
.fetch().rowsUpdated()
.map(deleted -> {
// now recreate every relationship
GenericExecuteSpec statement = client.sql("INSERT INTO persons_address (person, address) VALUES (:person, :address)");
person.getOfficePlaces().forEach(address -> {
statement
.bind("person", person.getId()).bind("address", address.getId())
.fetch().rowsUpdated() // there we go AWOL
.subscribe(inserted -> {
// logging here
});
});
return person; //FIXME wait! need above grouped statements to complete
});
}
注意:我使用 H2 作为后端。 感谢您提供任何信息!
我找到了一个合适的批处理技术(这里替换了 map/deleted 部分),但甚至卡住了,因为 Statement#execute 是 returning 一个只有 #subscribe 方法的发布者,我不能' t return 来自链。所以我用一些齿轮喂养了野兽
//DEBUG I couldn't figure out how to use labels! good enough
private static final String SQL_INSERT = "INSERT INTO persons_address (person, address) VALUES (, )";
...
.flatMap(deleted -> {
if (person.getOfficePlaces().isEmpty()) {
return Mono.just(person);
} else {
return client.inConnection(cnx -> {
Statement stmt = cnx.createStatement(SQL_INSERT);
person.getOfficePlaces().forEach(address -> {
stmt.bind(0, person.getId()).bind(1, address.getId()).add();
});
return Flux.from(stmt.execute()).last().map(dontcare -> person);
});
}