Koltin Vertx с блоками Coroutines при попытке вызвать блокировку запуска
Я использую стороннюю библиотеку, которая выставила функцию обратного вызова. Функция обратного вызова будет вызвана в случае успеха. Функция обратного вызова не является функцией приостановки, но когда я пытаюсь сделать вызов внутри функции без приостановки, чтобы вернуть результат функции приостановки, которая делает вызов ввода-вывода с использованием aysnc и await, вызов никогда не выполняется. Ниже я предложил простой фрагмент кода, который демонстрирует проблему.
open class TestVerticle: CoroutineVerticle() {
override suspend fun start() {
awaitBlockingExample()
}
fun awaitBlockingExample():String {
val future= async(vertx.dispatcher()) {
makeSuspendFunCall()
}
val result:String= runBlocking(vertx.dispatcher()){future.await()}
println(" The final Result is $result")
return result
}
suspend fun makeSuspendFunCall():String{
println("Comming here 3")
delay(500)
val resp="Test"
return resp
}
}
fun main(args: Array<String>) = runBlocking {
Vertx.vertx().deployVerticle("TestVerticle")
}
Программа запускает штрафы, если я уберу функцию задержки в makeSuspendFunCall, но она зависнет, если я добавлю функцию задержки. Я на самом деле имитирую функцию приостановки сетевого вызова, используя здесь функцию задержки. Как я могу получить результат от awaitBlockingExample в этом сценарии? Я ясно понимаю, что, сделав awaitBlockingExample в качестве функции приостановки, я смогу выполнить эту работу, удалить асинхронные и выполнить блокирующие вызовы внутри. Но здесь awaitBlockingExample (не приостановленная функция) представляет реализацию, предоставленную библиотекой этой стороны, где она переопределена в нашей реализации. Например, кеш guava предоставляет функцию перезагрузки, я хотел бы переопределить функцию перезагрузки (функция без приостановки) и вызвать функцию сопрограммы из метода перезагрузки для обновления значения кэша из базы данных или сетевого вызова.
3 ответа
Проблема в том, что vertx.dispatcher()
использует один поток в качестве цикла событий и runBlocking
блокирует эту тему.
Подробности:
Ваш awaitBlockingExample()
функция работает в этом потоке цикла событий Vertx, потому что она запускается из suspend start()
функция. Если вы вызываете runBlocking()
этот Vertx-поток заблокирован и никогда не освобождается. Но ваши другие сопрограммы, например, async()
Теперь нет нити, чтобы сделать свою работу.
Решение:
Я предполагаю, что вызов awaitBlockingExample
от start
Функция происходит только в этом примере. В действительности я бы предположил, что внешний обратный вызов использует свой собственный поток. Тогда нет никаких проблем, потому что теперь внешний поток заблокирован:
override suspend fun start() {
//simulate own thread for external callback
thread {
awaitBlockingExample()
}
}
fun awaitBlockingExample():String {
val future= async(vertx.dispatcher()) {
makeSuspendFunCall()
}
val result:String= runBlocking(vertx.dispatcher()){future.await()}
println(" The final Result is $result")
return result
}
Кстати: вам не требуется async()
блок, вы можете напрямую позвонить makeSuspendFunCall()
от runBlocking()
fun awaitBlockingExample():String = runBlocking(vertx.dispatcher()){
val result = makeSuspendFunCall()
println(" The final Result is $result")
result
}
Для Kotlin 1.3.0 и выше
private val mainScope = CoroutineScope(Dispatchers.Main)
fun start(){
mainScope.launch {
val data = withContext(Dispatchers.IO){
//This function will return the result. Return type of the below function will be type of data variable above.
awaitBlockingExample()
}
//use your data result from async call. Result will be available here as soon as awaitBlockingExample() return it.
}
//Your function will continue execution without waiting for async call to finish.
}
fun awaitBlockingExample():String {
//Your Logic
}
Надеюсь, это поможет.
Попробуйте следующий подход:
override fun start() {
GlobalScope.launch {
val result = awaitBlockingExample()
}
}
suspend fun awaitBlockingExample(): String {
val response = makeSuspendFunCall()
println(" The final Result is $response")
return response
}
suspend fun makeSuspendFunCall():String{
println("Comming here 3")
return suspendCoroutine {
delay(500)
val resp="Test"
it.resume(resp)
}
}