Как использовать рабочие очереди в SwiftNIO?
У меня есть сервер Swift NIO HTTP2, который обрабатывает запрос в цикле событий контекста. Но я хочу обработать запрос в другом потоке, пуле потоков GCD aync, получить результат и отправить его.
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.eventLoop.execute {
context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
// ...
var buffer = context.channel.allocator.buffer(capacity: respBody.count)
buffer.writeString(respBody)
context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
}.whenComplete { _ in
context.close(promise: nil)
}
}
}
Если я изменю его, чтобы использовать глобальную очередь GCD, как мне вернуть EventLoopFuture<Void>
ответ?
context.eventLoop.execute {
context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
DispatchQueue.global().async {
return self.send("hello world new ok", to: context.channel).whenComplete({ _ in
_ = context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
context.close(promise: nil)
})
}
}
}
Можно ли использовать таким образом глобальную очередь GCD или как я буду использовать рабочие потоки?
Функция send string вызывает следующую функцию для записи тела.
private func sendData(_ data: Data, to channel: Channel, context: StreamContext) -> EventLoopFuture<Void> {
let headers = self.getHeaders(contentLength: data.count, context: context)
_ = self.sendHeader(status: .ok, headers: headers, to: channel, context: context)
var buffer = channel.allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
return channel.writeAndFlush(part)
}
1 ответ
Правила в SwiftNIO:
- операции на
Channel
s потокобезопасны, так что вы можете делать их из любого потока или очереди - операции на
ChannelHandlerContext
не являются потокобезопасными и должны выполняться только изChannelHandler
, ВсеChannelHandler
события называются справаEventLoop
,
Так что ваш пример почти верен, просто убедитесь, что вы используете только Channel
и никогда ChannelHandlerContext
из DispatchQueue
или любой другой поток (это не канал EventLoop
).
let channel = context.channel // save on the EventLoop
channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
DispatchQueue.global().async {
self.send("hello world new ok", to: channel).flatMap {
channel.writeAndFlush(HTTPServerResponsePart.end(nil))
}.whenComplete {
channel.close(promise: nil)
}
}
}
Есть одно предположение, которое я делаю здесь: self.send
нормально с вызовом из любого потока и не использует ChannelHandlerContext
что вы могли бы хранить на self
, Чтобы оценить, если self.send
здесь все в порядке, мне нужно знать, что именно он делает.
Кроме того, в вашем первом фрагменте кода у вас есть избыточный eventloop.execute
:
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
// eventLoop.execute not necessary here
context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
// ...
var buffer = context.channel.allocator.buffer(capacity: respBody.count)
buffer.writeString(respBody)
context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
}.whenComplete { _ in
context.close(promise: nil)
}
}
context.eventLoop.execute
не нужно, потому что любое событие на ChannelHandler
всегда вызывается в правильном EventLoop
,