vert.x 工作线程中的异常

exception in vert.x worker thread

我是 vert.x 平台的新手。我的项目中有一个标准 Verticle 和一个 Worker Verticle,它们通过 eventBus 进行通信。 worker Verticle 在循环和数据库访问中执行多个 REST API 调用。

我的问题是 worker Verticle 在某些时候毫无问题地完成了任务 运行 但有时它会抛出以下错误。

Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed

我正在使用 kotlin 协程来处理 constructDevice(vertx: Vertx) 执行大部分 REST API 调用和数据库访问的函数。

任何人都可以告诉我上述问题的原因是什么,还有什么方法可以改进 constructDevice(vertx: Vertx) 函数以有效处理多个 REST API 调用和 MongoDB 访问。

    // worker verticle to handle multiple REST API calls and MongoDB database access
    
    class DeviceDiscoverVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val consumer = vertx.eventBus().localConsumer<String>("listDevice")
            consumer.handler { message ->
                CoroutineScope(vertx.dispatcher()).launch {
                    constructDevice(vertx)
                }
                message.reply("discovered")
            }
        }
    }
    
    // standard verticle to communicate with worker verticle 
    
    class ListDeviceVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val reply = awaitResult<Message<String>> { h ->
                vertx.eventBus().request("listDevice", "deviceIPs", h)
            }
            println("Reply received: ${reply.body()}")
        }
    }
    
    fun main() {
        val vertx = Vertx.vertx()
        val workOption = DeploymentOptions().setWorker(true)
        vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
        vertx.deployVerticle(ListDeviceVerticle())
    }


    suspend fun constructDevice(vertx: Vertx) {
        val deviceRepository = listOf(
            "10.22.0.106",
            "10.22.0.120",
            "10.22.0.115",
            "10.22.0.112"
        )
    
        val webClient = WebClient.create(vertx)
        val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
        val mongoClient: MongoClient = MongoClient.create(vertx, config)
        val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
        
        // loop through the IP list and calls REST endpoints
        
        val deviceList = deviceRepository.map { deviceIP ->
            val deviceIPconfig: DeviceIPconfig
            val deviceType: DeviceType
            val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
            val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
    
            val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceIP.send(handler)
            }
            deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
                json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
            } else {
                println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
                DeviceIPconfig()
            }
            
            val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceType.send(handler)
            }
            if (responseDeviceType.statusCode() == 200) {
                deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
                val device = DeviceModel(deviceIPconfig, deviceType)
                json {
                    obj(
                        "_id" to deviceIPconfig.localMac,
                        "device" to json.stringify(DeviceModel.serializer(), device)
                    )
                }
            } else {
                println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
                jsonObjectOf()
            }
    
        }.filterNot { it.isEmpty }
        
        // construct data to upload in mongoDB
        
        val activeDeviceIDs = json {
            obj("_id" to "activeDeviceIDs",
                "activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
        }
        val activeDevices = json {
            obj("_id" to "activeDevices",
                "activeDevices" to json { array(deviceList) }
            )
        }
        
        // save the data in MongoDB
        
        mongoClient.save("devices", activeDeviceIDs) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
        mongoClient.save("devices", activeDevices) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
    }

更新问题:1

@Damian 我已经根据您的意见更新了我的问题。我在上面简化了我的问题以便于理解,但是当我尝试使用 promise/future 实现这些东西时,我在某个时候陷入困境。

我的任务是从不同的 REST 端点获取数据并从中获取 kotlin class 合同,我想并行处理。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            // this will return Json array and each element of that array needs to be called again in a loop.
            val result = asyncResult.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            deviceDevicesPromise.complete(result)
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {

    val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
    // I need to call other rest points similar to this and I need map the result to kotlin class.

   // how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>. 

}

class DeviceDiscoverVerticle : AbstractVerticle() {
        override fun start() {
            val deviceRepository = // list of IP strings
    
            val webClient = WebClient.create(vertx)
            vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
                deviceRepository.forEach { deviceIP ->
                    val futureList = constructDevice(vertx, webClient, deviceIP)
                    CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                            if (allFuturesResult.succeeded()) {
                                // how to handle individual future result here to construct data
                            } else {
                                println("failed")
                            }
                        }
                }
            }
        }

更新问题:2

@Damian 按照你的建议更新了我的代码。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed")
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
    val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        }
        else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}


fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
    val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
    val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if(ar.succeeded()) {
            val futureList = ar.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            CompositeFuture.all(futureList).onComplete { asyncResult ->
                if (asyncResult.succeeded()) {
                    asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
                        //not all future in futureList are completed here some of them shows Future{unresolved}
                    }
                    constructDevicePromise.complete(DeviceFlow(label = "xyz"))
                }
                else {
                    constructDevicePromise.fail("failed")
                }
            }

        }
    }
    return constructDevicePromise.future()
}


class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = //list of IPs

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                val constructDeviceFuture = constructDevice(webClient, deviceIP)
                constructDeviceFuture.onComplete {ar ->
                    //println(ar.result().toString())
                }
            }
        }
    }
}

我的问题在里面

CompositeFuture.all(futureList).onComplete { asyncResult ->
                        if (asyncResult.succeeded()) {
                            asyncResult.result().list<HttpResponse<Buffer>>().forEach {

这里大部分期货都没有解决,执行在这里被挂起。

[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

所以我根据 vert.x 将 CompositeFuture.all(futureList).onComplete 更改为 CompositeFuture.join(futureList).onComplete 文档加入将等待所有未来完成

The join composition waits until all futures are completed, either with a success or a failure. CompositeFuture.join takes several futures arguments (up to 6) and returns a future that is succeeded when all the futures are succeeded and failed when all the futures are completed and at least one of them is failed

但现在很少有期货失败了。这是更改为 CompositeFuture.join

后未来列表的输出
CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
                            if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
                                asyncResult.result().list<HttpResponse<Buffer>>().forEach {



[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

因为我的设备无法处理并发请求,很少有 futures 失败?还有为什么程序执行卡在里面

asyncResult.result().list<HttpResponse<Buffer>>().forEach { 

如果设备并发请求处理有问题,那么该问题的其他解决方案是什么。是否可以 运行 整个其余部分调用 vertx 环境并通过事件总线与其通信?

此外,如果我将 DeviceDiscoverVerticle 部署为标准 Verticle 而不是 worker Verticle,应用程序将完全卡在 CompositeFuture.all(futureList).onComplete

我不熟悉 kotlin 和协程,但我可能对 vert.x 本身有一些建议。首先根据 documentation

In most cases, a Web Client should be created once on application startup and then reused. Otherwise you lose a lot of benefits such as connection pooling and may leak resources if instances are not closed properly.

我看到您在 constructDevice 方法中调用了 Webclient.create(vertx),因此每次发送 'listDevice' 事件时都会创建新的 WebClient,因此您可以考虑更改它。

我最近有非常相似的事情要做,最后使用了 Futures。请注意,当您调用 awaitResult 时,您正在阻塞线程以等待异步执行,如果那是标准的 Verticle,您确实会收到阻塞线程警告的垃圾邮件。你可以做的是创建一个承诺,complete/fail 它在你的 http 处理程序中,在处理程序之外你只是 return promise.future() 对象。在循环之外你可以处理所有的 futures,区别在于 futures 处理也是异步的,所以你不会阻塞线程。

此外,为了使代码更简洁并利用 vert.x 异步特性,最好将 http 和 mongo 处理拆分为单独的 Verticle i。 e.

  1. HttpVerticle 获取 listDevice 事件
  2. HttpVerticle 为 5 个不同的请求创建 5 个 futures
  3. 当所有期货完成时 future.onComplete()/compositeFuture.all() 被触发并发送 'updateDB' 事件
  4. MongoVerticle 接收并处理 'updateDB' 事件

这里可能没有解决您的具体问题,但我希望它至少能引导您更进一步

在评论之后这是java

中的期货示例
public class HttpVerticle extends AbstractVerticle {

WebClient webClient;

@Override
public void start() throws Exception {

    webClient = WebClient.create(vertx);

    vertx.eventBus().consumer("run_multiple_requests", event -> {
        //When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
        Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();

        //Since webclient is async, all calls will be asynchronous
        webClient.get("ip1", "/endpoint")
                .send(asyncResult -> {
                    //async block #1 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request1Promise.complete(asyncResult.result());
                    } else {
                        request1Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 is probably still processing
        webClient.get("ip2", "/endpoint")
                .send(asyncResult -> {
                    //async block #2 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request2Promise.complete(asyncResult.result());
                    } else {
                        request2Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 and #2 are probably still processing
        webClient.get("ip3", "/endpoint")
                .send(asyncResult -> {
                    //async block #3 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request3Promise.complete(asyncResult.result());
                    } else {
                        request3Promise.fail("Http request failed");
                    }
                });

        //retrieving futures from promises
        Future<HttpResponse<Buffer>> future1 = request1Promise.future();
        Future<HttpResponse<Buffer>> future2 = request2Promise.future();
        Future<HttpResponse<Buffer>> future3 = request3Promise.future();

       
        CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
            //async block #4 this will be executed only when all futures complete, but since it's async it does
            // not block our 'main thread'
            if (allFuturesResult.succeeded()) {
                //all requests succeeded
                vertx.eventBus().send("update_mongo", someMessage);
            } else {
                //some of the requests failed, handle it here
            }
        });
        
        //at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
        //but we leave our event handler and free 'main thread' without waiting for anything
    });
}

当然,这段代码可以(而且应该)更短,所有这些都是硬编码的,没有任何数组和循环,只是为了清楚起见

如果你使用 logback 或 log4j(可能还有其他),你可以将 [%t] 放在日志模式中,它会在日志消息中显示你的线程名称,对我个人而言,这对理解所有流程非常有帮助这些异步块

还有一件事,使用此设置,所有三个请求实际上将同时发送,因此请确保 http 服务器能够同时处理多个请求。

更多地了解您要实现的目标,首先在方法 constructDeviceDevices() 中,我会将 return 类型更改为 Future<HttpResponse<Buffer>> 如果成功,只需调用 deviceDevicesPromise.complete(asyncResult.result())

然后,在 constructDevice() 方法中,我将调用修改后的 constructDeviceDevices() 方法并从中获取未来对象,我们称之为 Future<HttpResponse<Buffer>> httpDevicesFuture。下一步是在该处理程序中调用 httpDevicesFuture.onComplete(ar -> {<handler code>}),您可以访问 ar.result(),这是来自“.../devices/”端点的响应,所以现在在同一个块中,我将遍历该响应并获得List<Future<HttpResponse<Buffer>>>。仍然停留在同一个区块,我会写 CompositeFuture.all(futuresList).onComplete(ar -> handler) 这个 ar 将是 CompositeFuture 类型 它有一个方法 list() 实际上是 returns 已完成期货列表(在这个处理程序中,它们都完成了)所以现在使用该列表,您可以为每个未来检索 HttpResponse<Buffer>,并且每个都将是您的“.../devices/$device”响应,您可以映射它们到你想要的任何对象。现在在同一个处理程序中,我将决定下一步我想去哪里,我可能会通过在 eventBus 上发送一条消息来做到这一点,比如 eventBus.send("HTTP_PROCESSING_DONE", serializedDevices) 或者万一出现问题 eventBus.send("HTTP_FAILURE", someMessage)。但是在你的情况下,如果你想为某个列表中的每个 IP 执行所有这些操作,而不是强制它同步然后仍然在同一个块中,你可以进行任何对象映射并调用 constructDeviceFuture.complete(mappedObject/List<MappedObject>) 这意味着你必须再创造一个未来,你将从 constructDevice() 方法 return

基本上你被卡住了,因为你试图在异步世界中重现顺序执行,特别是在你尝试 return 来自 constructDevice() 方法的值的那一刻,这意味着我们实际上想要在处理这行代码时等待所有执行完成,而在 vert.x 中并非如此。

它看起来像那样(语法可能已关闭,因此将其视为伪代码)

    fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {

    //Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
    val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if (ar.succeeded()) {
            val futureList: List<Future<HttpResponse<Buffer>>>
            //loop through ar.result() and populate deviceDevicesFuture list

            CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                if (allFuturesResult.succeeded()) {
                    // here you have access to allFuturesResult.list() method
                    // at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
                    // when you have List<HttpResponse> you map it to whatever you want
                    val myMappedObject: SomeDomainObject = mappingResult()
                    constructDevicePromise.complete(myMappedObject)
                } else {
                    constructDevicePromise.fail("failed")
                }
            }
        }
    }

    return constructDevicePromise.future()
}

class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = // list of IP strings

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                //here dependent on your logic, you handle each future alone or create a list and handle them together
                val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
                constructDeviceFuture.onComplete(ar -> {
                    ar.result() // <- this is your mapped object
                    eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
                })
            }
            
            //if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
        }
    }
}

更新 2 回复

关于CompositeFuture.all(): 你错过了一件事,CompositeFuture.all() waits until all futures succeeds OR at least one failed. 如果连一个都失败了,它就不会等待其他人(这就像逻辑与,不需要等待其余的因为我们已经知道结果)。 CompositeFuture.join() 另一方面只是等待所有 futures 完成,但如果其中任何一个失败,由此产生的 future 也将失败(但你至少应该得到所有 futures 的结果)。

这实际上就是您在输出中看到的内容,CompositeFuture.all() 您得到一堆已完成的 Futures,一个失败,其余未解决。

这部分还缺少一件事:

vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
        deviceRepository.forEach { deviceIP ->
            val constructDeviceFuture = constructDevice(webClient, deviceIP)
            constructDeviceFuture.onComplete {ar ->
                //println(ar.result().toString())
            }
        }
    }

你没有检查是否ar.succeeded(),如果你会看到最终的未来实际上是失败的,这就是为什么最终结果不是你所期望的。

现在纯粹是猜测发生了什么。如果你在单个请求处理程序中放置一些毫秒级精度的日志消息,那么你可能会(在某种程度上)杀死这个剩余 API(我假设每个 vertx 事件都是相同的 API)您可能应该看到请求之间相隔几毫秒。我想 API 能够满足很少的请求,然后下一个由于某些 exception/block/timeout 或其他原因而失败,而所有其他人可能根本没有得到响应,或者正在等待它们命中一些超时。如果你将 Verticle 定义为标准,当任何事情持续超过两秒时你会收到警告,更重要的是,有一个线程处理所有这些东西所以如果一个请求挂起很长时间,标准 Verticle 将完全没有响应那时。这可能是您陷入 CompositeFuture.join() 方法的原因。

所以现在你可以做几件事:

  1. 您可以将并发执行更改为sequential execution。基本上,不是事先创建 n 未来,而是为单个元素创建未来,然后调用 future.compose(ar -> {}) 此处理程序将仅在未来完成时调用。然后在同一个处理程序中为下一个元素创建和 return 未来。实现 imo 有点棘手,但可行(我已经使用 java stream reduce 将 x future 减少为单个)。当你这样实现时,你一次只有一个请求,所以 API 应该没有问题。请注意,不同的 IP 仍将同时处理,但每个 IP 的请求将是顺序的,因此它可能工作得很好。

  2. 您可以创建另一个标准 Verticle,它将仅响应单个事件,该事件将调用“/devices/$device”端点。现在,在您现在拥有的代码中,当您循环遍历初始 http 响应时,您无需再生成 20 个 HTTP 请求,而只需将 20 个事件发送到 eventBus。当您只有一个 Verticle 实例处理该特定消息时,并且它是一个只有一个线程的标准 Verticle,实际上此时应该只处理一条消息并且应该排队。这也非常容易调整,因为您可以增加 Verticle 实例的数量,并且您将拥有与 Verticle 实例数量一样多的并发请求。

  3. 你提到完全在vertx之外处理它,我认为这根本没有必要,但如果你认为它最适合你,那就很简单了。如果您已经从某个地方获得了 Vertx 个对象,那么将该对象传递给其他 class 的构造函数是没有问题的。在那里,你可以拥有自己的 http 客户端,你自己的方法,基本上任何你想要的东西,并且在某个时候当你决定要使用 vert.x 时,你可以调用 vertx.eventBus().send() 并触发一些逻辑,这将由 vert.x 处理。要记住的最重要的事情就是不要创建多个 Vertx 对象的实例,因为它们将有单独的事件总线。实际上如文档所述

Verticles ... This model is entirely optional and Vert.x does not force you to create your applications in this way if you don’t want to.

所以你可以让你的常规应用程序在任何框架中编写,并且在某些时候仍然只是实例化 Vertx 对象,执行单个任务,然后返回到你的基础框架,但老实说,我认为你非常接近解决这个问题:)