Как реализовать NIO Socket (клиент), используя сопрограммы Kotlin в Java Code?

Я хочу использовать сопрограммы Kotlin(v1.3.0) и java.nio.channels.SocketChannel (NIO) заменяет Socket connect (блокировка ввода-вывода) в Android. потому что это может сэкономить много потоков.

Код ниже не может работать из-за job.await() Это функция приостановки в Kotlin, это просто можно вызвать в блоке сопрограмм Ktolin. лайк launch{..}, async{..},

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}

Но я пытался использовать runBlocking{..} сделать эту функцию нормальной функцией в Java. но job.await заблокирован текущий поток Java, не приостановлен.

Итак, как мне реализовать эту функцию с сопрограммами Kotlin(v1.3.0)?

2 ответа

Как отметил Марко, ваш код все равно будет блокировать поток, даже если эта операция блокировки находится в асинхронной сопрограмме. Чтобы действительно получить желаемое асинхронное поведение с Java и Kotlin, вам нужно использовать Async-версию Socket Channel.

Благодаря этому вы получаете истинную асинхронную обработку сокетов. С помощью этого класса и метода Kottlin suspendCoroutine, вы можете превратить асинхронные обработчики в приостановленные вызовы.

Вот пример реализации этого для чтения:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}

Вы можете удалить упаковку, которую я здесь делаю, и просто выставить asyncRead метод на AsynchronousSocketChannel, например, так:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}

Все дело вкуса и каковы ваши цели дизайна. Вы должны иметь возможность реализовать аналогичный метод для первоначального подключения, как я сделал здесь для чтения.

// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()

Это не реалистичное ожидание. С точки зрения Java, suspend fun приостанавливает его выполнение, возвращая специальную константу, COROUTINE_SUSPENDED, Вам нужен компилятор Kotlin, чтобы скрыть это от вас и позволить вам писать обычный код, который может быть приостановлен.

Вашему коду не хватает неблокирующей приостановки даже с точки зрения Kotlin, потому что он использует блокирующий вызов для подключения. Отправка этого вызова в другой поток не делает его неблокирующим.

То, что делает ваш код, полностью эквивалентно отправке задания в службу Java-исполнителя и ожиданию его результата. Вы можете, например, использовать CompletableFuture.supplyAsync,

Другие вопросы по тегам