Vert.x:阻塞处理程序问题
Vert.x: Blocking handler issue
我想使用阻塞处理程序,但仍然出现错误:
java.lang.IllegalStateException: Response has already been written
这是我的代码:
Server.java
r.route("/api/details/send/").handler(BodyHandler.create());
r.route("/api/details/send/").handler(ctx-> {
JsonArray ja = ctx.getBodyAsJsonArray();
JsonArray params = new JsonArray();
vertx.executeBlocking(futur -> {
for(int i =0; i<ja.size();i++) {
JsonObject req = new JsonObject();
req.put("QUERY", "INSERT INTO detailsfacture VALUES ('',?,?,?,?,?,?,?)");
req.put("DB", "MYSQL_");
params.add(ja.getJsonObject(i).getValue("typefacture"))
.add(ja.getJsonObject(i).getValue("activites"))
.add(Integer.parseInt(ja.getJsonObject(i).getValue("qte").toString()))
.add(Double.parseDouble(ja.getJsonObject(i).getValue("pu").toString())
.add(ja.getJsonObject(i).getValue("unite"))
.add(Double.parseDouble(ja.getJsonObject(i).getValue("montant").toString())
.add(ja.getJsonObject(i).getValue("codefacture"));
req.put("PARAMS", params);
eb.send("EXECUTE", req, res -> {
if (res.succeeded()) {
params.clear();
ctx.response().putHeader("content-type", "application/json").end(res.result().body().toString());
} else {
ctx.response().putHeader("content-type", "application/json").end(res.cause().getMessage());
}
});
}
String result = "orsys";
futur.complete(result);
},resultat->{
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
//resultat.result().toString();
});
});
MySql.java
eb.consumer("MYSQL_EXECUTE_WITH_PARAMS", req->{
try{
JsonObject reqParams = (JsonObject)req.body();
String sql = reqParams.getString("QUERY");
client.getConnection( connection -> {
if (connection.succeeded()) {
try{
SQLConnection con = connection.result();
con.updateWithParams(sql,reqParams.getJsonArray("PARAMS"), query -> {
if(query.succeeded()){
UpdateResult urs = query.result();
req.reply(urs.toJson());
//req.reply(query.result());
}else{
req.fail(24, "Err Request : "+query.cause().getMessage());
}
});
}catch(Exception e){
req.fail(24, "Err Conn Failed : "+e.getMessage());
}
} else {
req.fail(24, "Err No Connection : "+connection.cause().getMessage());
}
});
}catch(Exception e){
req.fail(24, e.getMessage());
}
});
P.S。 :当我删除 executeBlocking 时,只有第一条记录在我的数据库中注册。
问候。
您在循环中将实体插入 detailsfacture
。对于每个插入,您调用以下内容:
ctx.response().putHeader("content-type", "application/json").end(res.result().body().toString());
如您所见,您调用了响应 object 的 end(...)
方法。这就是 IllegalStateException
的来源。正如 documentation 所述:
Once the response has ended, it cannot be used any more.
所以你的问题与executeBlocking
无关。
你应该看看 HttpServerResponse
的 write(...)
方法。对于每个插入,您应该调用 write(...)
而不是 end(...)
。但这只有在您知道整个响应的完整长度时才有效,因为您需要设置 header Content-length
。如果您完成所有插入,您需要调用 end()
来完成响应。此外,您应该只设置 header 一次,而不是为每个插入设置。
现在补充一些评论。我认为您的情况不需要 executeBlocking
。由于 Content-length
的问题,我建议用 Future
包裹每个插入,并用 CompositeFuture
组合它们。 Future
futur
用错了。 Event bus的send(...)
方法是非阻塞异步的。因此 futur.complete(result)
在您发送所有插入内容后立即被调用。同样奇怪的是 consumer
消耗 MYSQL_EXECUTE_WITH_PARAMS
而 send
发送到 EXECUTE
.
我尝试了另一种解决方案来获取我的查询 (?,?,...,?),(?,?,...,?),..,(?,?,... ,?).
这是我的代码:
public static String getMultipleInsertReq(String table, JsonArray columns,JsonArray data){
JsonObject tab= Tables.Tables_list.getJsonObject(table); // name of table
String sql = "";
if(tab != null){
sql = "INSERT INTO "+table + "( ";
if(columns == null){
columns = tab.getJsonArray("COLS"); //columns from ur database
}
if(columns!=null){
for(int i=0;i<columns.size();i++){
if(i==columns.size()-1){
sql+=columns.getString(i)+") VALUES";
}
else{
sql+=columns.getString(i)+",";
}
}
for(int i =0; i<data.size();i++){
for(int j=0; j<columns.size();j++){
if(j==columns.size()-1 && i!=data.size()-1){
sql+="?),";
}
else if (i==data.size()-1 && j==columns.size()-1){
sql+="?)";
}
else if (j==0){
sql+="(?,";
}
else{
sql+="?,";
}
}
}
return sql;
}
}
return null;
}
希望对您有所帮助。
P.S.: 它只是一个查询生成器,因此您可以根据需要进行调整。
此致。
我想使用阻塞处理程序,但仍然出现错误:
java.lang.IllegalStateException: Response has already been written
这是我的代码:
Server.java
r.route("/api/details/send/").handler(BodyHandler.create());
r.route("/api/details/send/").handler(ctx-> {
JsonArray ja = ctx.getBodyAsJsonArray();
JsonArray params = new JsonArray();
vertx.executeBlocking(futur -> {
for(int i =0; i<ja.size();i++) {
JsonObject req = new JsonObject();
req.put("QUERY", "INSERT INTO detailsfacture VALUES ('',?,?,?,?,?,?,?)");
req.put("DB", "MYSQL_");
params.add(ja.getJsonObject(i).getValue("typefacture"))
.add(ja.getJsonObject(i).getValue("activites"))
.add(Integer.parseInt(ja.getJsonObject(i).getValue("qte").toString()))
.add(Double.parseDouble(ja.getJsonObject(i).getValue("pu").toString())
.add(ja.getJsonObject(i).getValue("unite"))
.add(Double.parseDouble(ja.getJsonObject(i).getValue("montant").toString())
.add(ja.getJsonObject(i).getValue("codefacture"));
req.put("PARAMS", params);
eb.send("EXECUTE", req, res -> {
if (res.succeeded()) {
params.clear();
ctx.response().putHeader("content-type", "application/json").end(res.result().body().toString());
} else {
ctx.response().putHeader("content-type", "application/json").end(res.cause().getMessage());
}
});
}
String result = "orsys";
futur.complete(result);
},resultat->{
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
//resultat.result().toString();
});
});
MySql.java
eb.consumer("MYSQL_EXECUTE_WITH_PARAMS", req->{
try{
JsonObject reqParams = (JsonObject)req.body();
String sql = reqParams.getString("QUERY");
client.getConnection( connection -> {
if (connection.succeeded()) {
try{
SQLConnection con = connection.result();
con.updateWithParams(sql,reqParams.getJsonArray("PARAMS"), query -> {
if(query.succeeded()){
UpdateResult urs = query.result();
req.reply(urs.toJson());
//req.reply(query.result());
}else{
req.fail(24, "Err Request : "+query.cause().getMessage());
}
});
}catch(Exception e){
req.fail(24, "Err Conn Failed : "+e.getMessage());
}
} else {
req.fail(24, "Err No Connection : "+connection.cause().getMessage());
}
});
}catch(Exception e){
req.fail(24, e.getMessage());
}
});
P.S。 :当我删除 executeBlocking 时,只有第一条记录在我的数据库中注册。 问候。
您在循环中将实体插入 detailsfacture
。对于每个插入,您调用以下内容:
ctx.response().putHeader("content-type", "application/json").end(res.result().body().toString());
如您所见,您调用了响应 object 的 end(...)
方法。这就是 IllegalStateException
的来源。正如 documentation 所述:
Once the response has ended, it cannot be used any more.
所以你的问题与executeBlocking
无关。
你应该看看 HttpServerResponse
的 write(...)
方法。对于每个插入,您应该调用 write(...)
而不是 end(...)
。但这只有在您知道整个响应的完整长度时才有效,因为您需要设置 header Content-length
。如果您完成所有插入,您需要调用 end()
来完成响应。此外,您应该只设置 header 一次,而不是为每个插入设置。
现在补充一些评论。我认为您的情况不需要 executeBlocking
。由于 Content-length
的问题,我建议用 Future
包裹每个插入,并用 CompositeFuture
组合它们。 Future
futur
用错了。 Event bus的send(...)
方法是非阻塞异步的。因此 futur.complete(result)
在您发送所有插入内容后立即被调用。同样奇怪的是 consumer
消耗 MYSQL_EXECUTE_WITH_PARAMS
而 send
发送到 EXECUTE
.
我尝试了另一种解决方案来获取我的查询 (?,?,...,?),(?,?,...,?),..,(?,?,... ,?).
这是我的代码:
public static String getMultipleInsertReq(String table, JsonArray columns,JsonArray data){
JsonObject tab= Tables.Tables_list.getJsonObject(table); // name of table
String sql = "";
if(tab != null){
sql = "INSERT INTO "+table + "( ";
if(columns == null){
columns = tab.getJsonArray("COLS"); //columns from ur database
}
if(columns!=null){
for(int i=0;i<columns.size();i++){
if(i==columns.size()-1){
sql+=columns.getString(i)+") VALUES";
}
else{
sql+=columns.getString(i)+",";
}
}
for(int i =0; i<data.size();i++){
for(int j=0; j<columns.size();j++){
if(j==columns.size()-1 && i!=data.size()-1){
sql+="?),";
}
else if (i==data.size()-1 && j==columns.size()-1){
sql+="?)";
}
else if (j==0){
sql+="(?,";
}
else{
sql+="?,";
}
}
}
return sql;
}
}
return null;
}
希望对您有所帮助。 P.S.: 它只是一个查询生成器,因此您可以根据需要进行调整。
此致。