Как реализовать NIO Socket (клиент), используя сопрограммы Kotlin в Java Code?
Я хочу использовать сопрограммы Kotlin(v1.3.0) и java.nio.channels.SocketChannel (NIO) заменяет Socket connect
(блокировка ввода-вывода) в Android. потому что это может сэкономить много потоков.
Код ниже не может работать из-за job.await()
Это функция приостановки в Kotlin, это просто можно вызвать в блоке сопрограмм Ktolin. лайк launch{..}
, async{..}
,
// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {
// Start a new connection
// Create a non-blocking socket channel
val socketChannel = SocketChannel.open()
socketChannel.configureBlocking(false)
// async calls NIO connect function
val job = GlobalScope.async(oneThreadCtx) {
aConnect(socketChannel, address)
}
// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()
return socketChannel
}
Но я пытался использовать runBlocking{..}
сделать эту функцию нормальной функцией в Java. но job.await
заблокирован текущий поток Java, не приостановлен.
Итак, как мне реализовать эту функцию с сопрограммами Kotlin(v1.3.0)?
2 ответа
Как отметил Марко, ваш код все равно будет блокировать поток, даже если эта операция блокировки находится в асинхронной сопрограмме. Чтобы действительно получить желаемое асинхронное поведение с Java и Kotlin, вам нужно использовать Async-версию Socket Channel.
Благодаря этому вы получаете истинную асинхронную обработку сокетов. С помощью этого класса и метода Kottlin suspendCoroutine, вы можете превратить асинхронные обработчики в приостановленные вызовы.
Вот пример реализации этого для чтения:
class TcpSocket(private val socket: AsynchronousSocketChannel) {
suspend fun read(buffer: ByteBuffer): Int {
return socket.asyncRead(buffer)
}
fun close() {
socket.close()
}
private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
return suspendCoroutine { continuation ->
this.read(buffer, continuation, ReadCompletionHandler)
}
}
object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
attachment.resumeWithException(exc)
}
}
}
Вы можете удалить упаковку, которую я здесь делаю, и просто выставить asyncRead
метод на AsynchronousSocketChannel, например, так:
suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
return suspendCoroutine { continuation ->
this.read(buffer, continuation, ReadCompletionHandler)
}
}
object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
attachment.resumeWithException(exc)
}
}
Все дело вкуса и каковы ваши цели дизайна. Вы должны иметь возможность реализовать аналогичный метод для первоначального подключения, как я сделал здесь для чтения.
// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()
Это не реалистичное ожидание. С точки зрения Java, suspend fun
приостанавливает его выполнение, возвращая специальную константу, COROUTINE_SUSPENDED
, Вам нужен компилятор Kotlin, чтобы скрыть это от вас и позволить вам писать обычный код, который может быть приостановлен.
Вашему коду не хватает неблокирующей приостановки даже с точки зрения Kotlin, потому что он использует блокирующий вызов для подключения. Отправка этого вызова в другой поток не делает его неблокирующим.
То, что делает ваш код, полностью эквивалентно отправке задания в службу Java-исполнителя и ожиданию его результата. Вы можете, например, использовать CompletableFuture.supplyAsync
,