исключение в рабочем потоке vert.x
Я новичок в платформе vert.x. В моем проекте есть стандартная и рабочая версия, которая общается через eventBus. Вертикаль worker выполняет несколько вызовов REST API в цикле и при доступе к базе данных.
Моя проблема в том, что рабочая вертикаль выполняет задачу без проблем при некотором запуске, но иногда она выдает ошибку ниже.
Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed
Я использую корутину котлина для обработки constructDevice(vertx: Vertx)
функция, которая выполняет большинство вызовов REST API и доступа к базе данных.
Может ли кто-нибудь сказать мне, в чем причина вышеуказанной проблемы, также есть ли способ улучшить constructDevice(vertx: Vertx)
функция для эффективной обработки нескольких вызовов REST API и доступа к MongoDB.
// worker verticle to handle multiple REST API calls and MongoDB database access
class DeviceDiscoverVerticle : CoroutineVerticle() {
override suspend fun start() {
val consumer = vertx.eventBus().localConsumer<String>("listDevice")
consumer.handler { message ->
CoroutineScope(vertx.dispatcher()).launch {
constructDevice(vertx)
}
message.reply("discovered")
}
}
}
// standard verticle to communicate with worker verticle
class ListDeviceVerticle : CoroutineVerticle() {
override suspend fun start() {
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request("listDevice", "deviceIPs", h)
}
println("Reply received: ${reply.body()}")
}
}
fun main() {
val vertx = Vertx.vertx()
val workOption = DeploymentOptions().setWorker(true)
vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
vertx.deployVerticle(ListDeviceVerticle())
}
suspend fun constructDevice(vertx: Vertx) {
val deviceRepository = listOf(
"10.22.0.106",
"10.22.0.120",
"10.22.0.115",
"10.22.0.112"
)
val webClient = WebClient.create(vertx)
val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
val mongoClient: MongoClient = MongoClient.create(vertx, config)
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
// loop through the IP list and calls REST endpoints
val deviceList = deviceRepository.map { deviceIP ->
val deviceIPconfig: DeviceIPconfig
val deviceType: DeviceType
val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceIP.send(handler)
}
deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
} else {
println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
DeviceIPconfig()
}
val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceType.send(handler)
}
if (responseDeviceType.statusCode() == 200) {
deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
val device = DeviceModel(deviceIPconfig, deviceType)
json {
obj(
"_id" to deviceIPconfig.localMac,
"device" to json.stringify(DeviceModel.serializer(), device)
)
}
} else {
println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
jsonObjectOf()
}
}.filterNot { it.isEmpty }
// construct data to upload in mongoDB
val activeDeviceIDs = json {
obj("_id" to "activeDeviceIDs",
"activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
}
val activeDevices = json {
obj("_id" to "activeDevices",
"activeDevices" to json { array(deviceList) }
)
}
// save the data in MongoDB
mongoClient.save("devices", activeDeviceIDs) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
mongoClient.save("devices", activeDevices) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
}
Обновленный вопрос: 1
@Damian Я обновил свой вопрос на основе ваших отзывов. Я упростил свой вопрос выше, чтобы его было легко понять, но когда я попытался реализовать вещи, используя обещание / будущее, я в какой-то момент застрял.
Моя задача - получить данные с разных конечных точек REST и связать с ними класс kotlin, и я хочу, чтобы это происходило параллельно.
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed");
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
// this will return Json array and each element of that array needs to be called again in a loop.
val result = asyncResult.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
deviceDevicesPromise.complete(result)
} else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {
val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
// I need to call other rest points similar to this and I need map the result to kotlin class.
// how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>.
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = // list of IP strings
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val futureList = constructDevice(vertx, webClient, deviceIP)
CompositeFuture.all(futureList).onComplete { allFuturesResult ->
if (allFuturesResult.succeeded()) {
// how to handle individual future result here to construct data
} else {
println("failed")
}
}
}
}
}
Обновленный вопрос: 2
@Damian, как вы предложили, я обновил свой код.
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed")
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
deviceDevicesPromise.complete(asyncResult.result())
}
else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)
httpDevicesFuture.onComplete { ar ->
if(ar.succeeded()) {
val futureList = ar.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
//not all future in futureList are completed here some of them shows Future{unresolved}
}
constructDevicePromise.complete(DeviceFlow(label = "xyz"))
}
else {
constructDevicePromise.fail("failed")
}
}
}
}
return constructDevicePromise.future()
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = //list of IPs
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val constructDeviceFuture = constructDevice(webClient, deviceIP)
constructDeviceFuture.onComplete {ar ->
//println(ar.result().toString())
}
}
}
}
}
Моя проблема внутри
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
здесь большинство фьючерсов не решены, и здесь вешается исполнение.
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
так что я изменился CompositeFuture.all(futureList).onComplete
к CompositeFuture.join(futureList).onComplete
согласно документу vert.x присоединение будет ждать завершения всего будущего
Композиция соединения ожидает, пока все фьючерсы не будут завершены с успехом или неудачей. CompositeFuture.join принимает несколько аргументов фьючерса (до 6) и возвращает фьючерс, который успешно завершен, когда все фьючерсы выполнены, и терпит неудачу, когда все фьючерсы завершены и хотя бы один из них не прошел.
но сейчас очень немногие фьючерсы терпят неудачу. Вот результат будущего списка после перехода наCompositeFuture.join
CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
Так мало фьючерсов терпят неудачу из-за того, что мое устройство не может обрабатывать параллельные запросы? также почему выполнение программы застряло внутри
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
Если это проблема с обработкой запросов параллелизма устройств, то каково другое решение этой проблемы. Можно ли запускать все эти вызовы отдыха из среды vertx и общаться с ней через шину событий?
Также, если я развернул DeviceDiscoverVerticle
как стандартная вертикаль вместо рабочей, приложение полностью застревает на CompositeFuture.all(futureList).onComplete
.
2 ответа
Зная немного больше, чего вы пытаетесь достичь, прежде всего в методе constructDeviceDevices()
Я бы изменил возвращаемый тип на просто Future<HttpResponse<Buffer>>
и если это удастся просто позвони deviceDevicesPromise.complete(asyncResult.result())
Затем в constructDevice()
метод, который я бы назвал нашим модифицированным constructDeviceDevices()
метод и получить из него будущий объект, назовем его Future<HttpResponse<Buffer>> httpDevicesFuture
. Следующим шагом будет позвонитьhttpDevicesFuture.onComplete(ar -> {<handler code>})
в этом обработчике у вас есть доступ к ar.result()
который является ответом от конечной точки ".../devices/", поэтому теперь в том же блоке я пропущу этот ответ и получу List<Future<HttpResponse<Buffer>>>
. Все еще оставаясь в том же квартале, я бы написалCompositeFuture.all(futuresList).onComplete(ar -> handler)
этот ar
будет типа CompositeFuture
у него есть метод list()
который фактически возвращает список завершенных фьючерсов (и в этом обработчике все они завершены), поэтому теперь, используя этот список, вы можете получить HttpResponse<Buffer>
для каждого будущего, и каждый из них будет вашим ответом ".../devices/$device", и вы можете сопоставить их с любыми объектами, которые захотите. Теперь в том же обработчике я бы решил, куда мне идти дальше, и, вероятно, сделал бы это, отправив сообщение на eventBus, напримерeventBus.send("HTTP_PROCESSING_DONE", serializedDevices)
или если что-то пойдет не так eventBus.send("HTTP_FAILURE", someMessage)
. Но в вашем случае, если вы хотите выполнить все это для каждого IP-адреса в некотором списке, а не заставлять его быть синхронным, тогда все еще в том же блоке вы можете выполнять сопоставление любых объектов и вызыватьconstructDeviceFuture.complete(mappedObject/List<MappedObject>)
это означает, что вам нужно создать еще одно будущее, из которого вы вернетесь constructDevice()
метод
В основном вы застряли, потому что пытаетесь воспроизвести последовательное выполнение в асинхронном мире, особенно в тот момент, когда вы пытаетесь вернуть значение из constructDevice()
Это означало бы, что мы действительно хотим дождаться завершения всего этого выполнения во время обработки этой строки кода, а в vert.x это не так.
это будет выглядеть примерно так (синтаксис, вероятно, отключен, поэтому относитесь к нему как к псевдокоду)
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed");
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
deviceDevicesPromise.complete(asyncResult.result())
} else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {
//Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)
httpDevicesFuture.onComplete { ar ->
if (ar.succeeded()) {
val futureList: List<Future<HttpResponse<Buffer>>>
//loop through ar.result() and populate deviceDevicesFuture list
CompositeFuture.all(futureList).onComplete { allFuturesResult ->
if (allFuturesResult.succeeded()) {
// here you have access to allFuturesResult.list() method
// at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
// when you have List<HttpResponse> you map it to whatever you want
val myMappedObject: SomeDomainObject = mappingResult()
constructDevicePromise.complete(myMappedObject)
} else {
constructDevicePromise.fail("failed")
}
}
}
}
return constructDevicePromise.future()
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = // list of IP strings
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
//here dependent on your logic, you handle each future alone or create a list and handle them together
val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
constructDeviceFuture.onComplete(ar -> {
ar.result() // <- this is your mapped object
eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
})
}
//if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
}
}
}
Ответ на обновление 2
Около CompositeFuture.all()
: Вы упускаете одну вещь, CompositeFuture.all()
ожидает, пока все фьючерсы завершатся успешно ИЛИ хотя бы один отказал.Если даже один из них потерпел неудачу, он не ждет других (это похоже на логическое И, не нужно ждать остальных, потому что мы уже знаем результат).CompositeFuture.join()
с другой стороны, просто ждет завершения всех фьючерсов, но все же, если кто-то из них терпит неудачу, результирующее будущее также будет неудачным (но вы должны, по крайней мере, получить результат для всех из них).
На самом деле это то, что вы видите в своем выводе, с CompositeFuture.all()
вы получаете кучу завершенных фьючерсов, одно неудачное, а остальные нерешенные.
Еще одна вещь, которой не хватает в этой части:
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val constructDeviceFuture = constructDevice(webClient, deviceIP)
constructDeviceFuture.onComplete {ar ->
//println(ar.result().toString())
}
}
}
вы не проверяете, если ar.succeeded()
, если вы хотите, вы увидите, что окончательное будущее на самом деле провалилось, поэтому конечный результат не такой, как вы ожидали.
Теперь просто размышляем о том, что происходит. Вы, вероятно, убиваете (до некоторой степени) этот API-интерфейс отдыха (я предполагаю, что это тот же API для каждого события vertx) с таким количеством одновременных запросов, если вы поместите какое-то сообщение журнала с точностью до миллисекунд внутри обработчика одного запроса, вы, вероятно, должны увидеть, что запросы несколько миллисекунд друг от друга. Я полагаю, что API может обслуживать несколько запросов, тогда следующий выходит из строя из-за какого-то исключения / блока / тайм-аута или чего-то еще, а все остальные, вероятно, вообще не получают ответа или ждут, пока они не достигнут некоторого таймаута. Если вы определите Verticle как стандартный, вы получите предупреждение, если что-то длится более двух секунд, более того, есть один поток, обрабатывающий все это, поэтому, если один запрос зависает на долгое время,стандартная вертикаль в это время полностью не реагирует. Вероятно, это причина, по которой вы застрялиCompositeFuture.join()
метод.
Итак, теперь вы можете делать несколько вещей:
Вы можете изменить параллельное выполнение на последовательное. По сути, вместо того, чтобы создавать
n
Futures заранее, вы создаете будущее для одного элемента, а затем вызываетеfuture.compose(ar -> {})
этот обработчик будет вызываться ТОЛЬКО по завершении будущего. Затем в том же обработчике вы создаете и возвращаете future для следующего элемента. Это немного сложно реализовать imo, но выполнимо (я сделал это, используя java stream reduce, чтобы уменьшитьx
будущее в одиночное). Когда вы реализуете это таким образом, у вас будет один запрос за раз, поэтому проблем с API не должно возникнуть. ОБРАТИТЕ ВНИМАНИЕ, что разные IP-адреса по-прежнему будут обрабатываться одновременно, но запросы на каждый IP-адрес будут последовательными, поэтому все может работать нормально.Вы можете создать еще одну стандартную вертикаль, которая будет реагировать только на одно событие, это событие будет вызывать конечную точку "/devices/$device". Теперь в коде, который у вас есть прямо сейчас, когда вы перебираете начальный HTTP-ответ, вместо того, чтобы порождать еще 20 HTTP-запросов, вы просто отправляете 20 событий в eventBus. Когда у вас есть только один экземпляр вертикали, обрабатывающей это конкретное сообщение, и это стандартная вертикаль только с одним потоком, фактически в данный момент должно быть обработано только одно сообщение, и оно должно просто стоять в очереди. Это также было бы очень легко настроить, потому что вы можете просто увеличить количество экземпляров вертикалей, и у вас будет столько одновременных запросов, сколько количество экземпляров вертикалей.
Вы упомянули, что полностью обрабатываете его вне vertx, я думаю, что в этом нет необходимости, но если вы решите, что это лучше для вас, то это довольно просто. Если у вас уже есть
Vertx
объект откуда-то еще, нет проблем передать этот объект конструктору другого класса. Там вы можете иметь свой собственный http-клиент, свои собственные методы, в основном все, что захотите, и в какой-то момент, когда вы решите, что хотите использовать vert.x, вы можете просто вызватьvertx.eventBus().send()
и запускать некоторую логику, которая будет обрабатываться vert.x. Самое важное, что нужно помнить, - это просто не создавать более одного экземпляраVertx
объект, поскольку у них будут отдельные автобусы событий. Фактически, как указано в документации
Вертикали... Эта модель не является обязательной, и Vert.x не заставляет вас создавать приложения таким образом, если вы этого не хотите.
Таким образом, вы можете написать обычное приложение в любой структуре и все же в какой-то момент просто создать экземпляр объекта Vertx, выполнить одну задачу и вернуться к своей базовой структуре, но, честно говоря, я думаю, что вы очень близки к решению этой проблемы:)
Я не знаком с kotlin и сопрограммами, но у меня могут быть предложения относительно самого vert.x. Прежде всего согласно документации
В большинстве случаев веб-клиент следует создавать один раз при запуске приложения, а затем использовать повторно. В противном случае вы потеряете множество преимуществ, таких как создание пула соединений, и можете потерять ресурсы, если экземпляры не будут закрыты должным образом.
Я вижу, что вы вызываете Webclient.create(vertx) внутри метода constructDevice, поэтому вы создаете новый WebClient каждый раз, когда отправляете событие listDevice, чтобы вы могли рассмотреть возможность его изменения.
Недавно у меня было очень похожее занятие, и в итоге я использовал Futures. Имейте в виду, что когда вы вызываете awaitResult, вы блокируете поток для ожидания асинхронного выполнения, если бы это была стандартная Verticle, вы действительно получили бы спам с предупреждениями о заблокированных потоках. Вместо этого вы можете создать обещание, завершить / не выполнить его внутри обработчика http, а вне обработчика вы просто вернете объект обещания.future(). Вне цикла вы можете обрабатывать все фьючерсы, разница в том, что обработка фьючерсов также будет асинхронной, поэтому вы не будете блокировать поток.
Кроме того, чтобы код был немного чище и использовался асинхронный характер vert.x, было бы хорошо разделить обработку http и mongo на отдельные вершины, т.е.
- HttpVerticle получает событие listDevice
- HttpVerticle создает 5 фьючерсов для 5 разных запросов
- Когда все фьючерсы завершаются, срабатывает future.onComplete()/ CompositeFuture.all() и отправляет событие updateDB
- MongoVerticle получает и обрабатывает событие updateDB
Ваша конкретная проблема, вероятно, здесь не рассматривается, но я надеюсь, что она приведет вас хотя бы на один шаг вперед
После комментария вот пример фьючерсов на java
public class HttpVerticle extends AbstractVerticle {
WebClient webClient;
@Override
public void start() throws Exception {
webClient = WebClient.create(vertx);
vertx.eventBus().consumer("run_multiple_requests", event -> {
//When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();
//Since webclient is async, all calls will be asynchronous
webClient.get("ip1", "/endpoint")
.send(asyncResult -> {
//async block #1 if it's worker verticle, it's probably picked up by another thread
//here we specify that our promise finished or failed
if (asyncResult.succeeded()) {
request1Promise.complete(asyncResult.result());
} else {
request1Promise.fail("Http request failed");
}
});
//at this point async block #1 is probably still processing
webClient.get("ip2", "/endpoint")
.send(asyncResult -> {
//async block #2 if it's worker verticle, it's probably picked up by another thread
//here we specify that our promise finished or failed
if (asyncResult.succeeded()) {
request2Promise.complete(asyncResult.result());
} else {
request2Promise.fail("Http request failed");
}
});
//at this point async block #1 and #2 are probably still processing
webClient.get("ip3", "/endpoint")
.send(asyncResult -> {
//async block #3 if it's worker verticle, it's probably picked up by another thread
//here we specify that our promise finished or failed
if (asyncResult.succeeded()) {
request3Promise.complete(asyncResult.result());
} else {
request3Promise.fail("Http request failed");
}
});
//retrieving futures from promises
Future<HttpResponse<Buffer>> future1 = request1Promise.future();
Future<HttpResponse<Buffer>> future2 = request2Promise.future();
Future<HttpResponse<Buffer>> future3 = request3Promise.future();
CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
//async block #4 this will be executed only when all futures complete, but since it's async it does
// not block our 'main thread'
if (allFuturesResult.succeeded()) {
//all requests succeeded
vertx.eventBus().send("update_mongo", someMessage);
} else {
//some of the requests failed, handle it here
}
});
//at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
//but we leave our event handler and free 'main thread' without waiting for anything
});
}
Конечно, этот код может (и должен) быть намного короче, все это жестко запрограммировано без каких-либо массивов и циклов просто для ясности
Если вы используете logback или log4j (другие, вероятно, тоже), вы можете поместить [%t] в шаблон журнала, он покажет вам имя потока в сообщении журнала, для меня лично очень полезно понять поток всех этих асинхронных блоков
Еще одна вещь: при такой настройке все три запроса будут отправляться практически одновременно, поэтому убедитесь, что http-сервер может обрабатывать несколько запросов одновременно.