StreamDelegate прекращает получать события после количества чтений в swift

У меня есть приложение для чата iOS, которое получает сообщения через сокетное соединение.

Когда пользователь открывает приложение через долгое время и появляется более 50 непрочитанных сообщений, сервер отправляет через сокет сообщение, указывающее количество непрочитанных сообщений, в этот момент приложение отображает предупреждение с индикатором выполнения, а затем сервер отправляет каждое сообщение. одно сообщение.

Таким образом, приложение получает каждое сообщение в потоке метода StreamDelegate (_ stream: Stream, handle eventCode: Stream.Event) и обновляет индикатор выполнения до конца сообщений.

Проблема в том, что когда в какой-то момент у меня появляется большое количество непрочитанных сообщений (около 300+), StreamDelegate перестает получать события с сообщениями, и сообщения об ошибках не отображаются.

Я вызываю метод подключения в глобальной очереди:

DispatchQueue.global().async {
    self.connect(host, port: port)
}

Это мой код подключения сокета:

    fileprivate func connect(_ host: String, port: Int) {

        postStatus(.connecting)

        self.host = NSString(string: host)
        self.port = UInt32(port)

        self.log("connect to \(host):\(port)")

        var readStream : Unmanaged<CFReadStream>?
        var writeStream : Unmanaged<CFWriteStream>?

        CFStreamCreatePairWithSocketToHost(nil, self.host, self.port, &readStream, &writeStream)

        self.inOk = false
        self.outOk = false
        self.inputStream = readStream!.takeRetainedValue()
        self.outputStream = writeStream!.takeRetainedValue()

        self.inputStream.delegate = self
        self.outputStream.delegate = self


        let mainThread = Thread.isMainThread;

        let loop = mainThread ? RunLoop.main : RunLoop.current

        self.inputStream.schedule(in: loop, forMode: RunLoopMode.defaultRunLoopMode)
        self.outputStream.schedule(in: loop, forMode: RunLoopMode.defaultRunLoopMode)

        self.inputStream.open()
        self.outputStream.open()

        self.timer = Timer.scheduledTimer(timeInterval: 5, target: self, selector: #selector(connectionTimeout), userInfo: nil, repeats: false)

        if(!mainThread) {
            loop.run()
        }

    }

В потоке метода StreamDelegate (_ stream: Stream, обрабатывать eventCode: Stream.Event) я получаю событие сообщения и обрабатываю его методом read(String)

    case Stream.Event.hasBytesAvailable:

        if let timer = timer {
            timer.invalidate()
            self.timer = nil
        }

        let json = ChatLibSwift.readMessage(self.inputStream)

        do {
            if StringUtils.isNotEmpty(json) {
                try self.read(json)
            }
        } catch let ex as NSError {
            LogUtils.log("ERROR: \(ex.description)")
        }

        break
    case Stream.Event.hasSpaceAvailable:
        break

Метод, который читает каждое сообщение:

static func readMessage(_ inputStream: InputStream) -> String {

    do {
        var lenBytes = [UInt8](repeating: 0, count: 4)


        inputStream.read(&lenBytes, maxLength: 4)

        // header

        let i32: Int = Int(UInt32.init(lenBytes[3]) | UInt32.init(lenBytes[2]) << 8 | UInt32.init(lenBytes[1]) << 16 | UInt32.init(lenBytes[0]) << 24 )

        var msg = [UInt8](repeating: 0, count: (MemoryLayout<UInt8>.size * Int(i32)))

        let bytesRead = inputStream.read(&msg, maxLength: Int(i32))

        if bytesRead == -1 {
            print("<< ChatLib ERROR -1")
            return ""
        }

        let s = NSString(bytes: msg, length: bytesRead, encoding: String.Encoding.utf8.rawValue) as String?

        if let s = s {
            if bytesRead == Int(i32) {
                return s
            }
            else {
                print("Error: readMessage \(s)")
            }
            return s
        }
        return ""
    } catch {

        return ""
    }
}

У кого-нибудь есть идеи, как это решить?

1 ответ

Основная идея состоит в том, чтобы принудительно планировать чтение потока после успешной операции чтения:

let _preallocatedBufferSize = 64 * 1024
var _preallocatedBuffer = [UInt8](repeating: 0, count: MemoryLayout<UInt8>.size * Int(_preallocatedBufferSize))

var message : ....

func readMessage(_ inputStream: InputStream) {

    if !inputStream.hasBytesAvailable || message.isCompleted {
        return
    }

    var theBuffer : UnsafeMutablePointer<UInt8>?
    var theLength : Int = 0

    // try to get buffer from the stream otherwise use the preallocated buffer
    if !inputStream.getBuffer(&theBuffer, length:&theLength) || nil == theBuffer
    {
        memset(&_preallocatedBuffer, 0, _preallocatedBufferSize)

        let theReadCount = inputStream.read(&_preallocatedBuffer, maxLength:_preallocatedBufferSize)
        if theReadCount > 0 {
            theBuffer = _preallocatedBuffer;
            theLength = theReadCount;
        } else {
            theBuffer = nil;
            theLength = 0;
        }
    }

    if nil != theBuffer && theLength > 0 {
        _message.appendData(theBuffer, length:theLength)

        self.perform(#selector(readMessage), with:inputStream, afterDelay:0.0, inModes:[RunLoopMode.defaultRunLoopMode])
    }
}
Другие вопросы по тегам