Kotlin Coroutines: ожидание завершения нескольких потоков

Итак, глядя на Coroutines в первый раз, я хочу параллельно обрабатывать загрузку данных и ждать ее завершения. Я смотрел вокруг и видел RunBlocking и Await и т. Д., Но не знал, как его использовать.

У меня пока есть

val jobs = mutableListOf<Job>()
jobs += GlobalScope.launch { processPages(urls, collection) }
jobs += GlobalScope.launch { processPages(urls, collection2) }
jobs += GlobalScope.launch { processPages(urls, collection3) }

Затем я хочу знать / ждать, пока они закончат

3 ответа

Решение

Вам не нужно вручную отслеживать ваши текущие задания, если вы используете концепцию структурированного параллелизма. Предполагая, что ваш processPages Функция выполняет некоторую блокировку ввода-вывода, вы можете инкапсулировать ваш код в следующую функцию приостановки, которая выполняет ваш код в диспетчере ввода-вывода, предназначенном для такой работы:

suspend fun processAllPages() = withContext(Dispatchers.IO) { 
    // withContext waits for all children coroutines 
    launch { processPages(urls, collection) }
    launch { processPages(urls, collection2) }
    launch { processPages(urls, collection3) }
}

Теперь, если самая верхняя функция вашего приложения еще не является функцией приостановки, то вы можете использовать runBlocking звонить processAllPages:

runBlocking {
    processAllPages()
}

Ты можешь использовать async Функция компоновщика для параллельной обработки загрузки данных:

class Presenter {
    private var job: Job = Job()
    private var scope = CoroutineScope(Dispatchers.Main + job) // creating the scope to run the coroutine. It consists of Dispatchers.Main (coroutine will run in the Main context) and job to handle the cancellation of the coroutine.

    fun runInParallel() {
        scope.launch { // launch a coroutine
            // runs in parallel
            val deferredList = listOf(
                    scope.asyncIO { processPages(urls, collection) },
                    scope.asyncIO { processPages(urls, collection2) },
                    scope.asyncIO { processPages(urls, collection3) }
            )

            deferredList.awaitAll() // wait for all data to be processed without blocking the UI thread

            // do some stuff after data has been processed, for example update UI
        }
    }

    private fun processPages(...) {...}

    fun cancel() {
        job.cancel() // invoke it to cancel the job when you don't need it to execute. For example when UI changed and you don't need to process data
    }
}

Функция расширения asyncIO:

fun <T> CoroutineScope.asyncIO(ioFun: () -> T) = async(Dispatchers.IO) { ioFun() } // CoroutineDispatcher - runs and schedules coroutines

GlobalScope.launch Не рекомендуется использовать, если вы не хотите, чтобы сопрограмма работала в течение всего срока службы приложения и не была отменена преждевременно.

Редактировать: как отметил Роман Елизаров, вы можете попытаться не использовать awaitAll() функция, если вы не хотите обновить интерфейс или сделать что-то еще сразу после обработки всех данных.

Можно использовать следующий подход.

      fun myTask() {
    GlobalScope.launch {
        val task = listOf(
            async {

            },
            async {

            }
        )
        task.awaitAll()

    }
}
Другие вопросы по тегам