Серверный поток внутри класса qt (нужен мьютекс?)

Я создал этот серверный класс, который запускает поток, когда приходит новое соединение. В некоторых случаях он работает нормально, но не очень стабильно. Я пытаюсь решить, где это ломается. Мой отладчик говорит мне кое-что о qmutex. Если кто-то может определить проблему. ти

Он соединяется с родителем с помощью сигнала и слотов и получает данные обратно.

Вот заголовок:

#ifndef FORTUNESERVER_H
#define FORTUNESERVER_H

#include <QStringList>
#include <QTcpServer>
#include <QThread>
#include <QTcpSocket>
#include <string>
using namespace  std;


class FortuneServer : public QTcpServer
{
    Q_OBJECT

 public:
    FortuneServer(QObject *parent = 0);

public slots:


void procesServerString(string serverString);
void getStringToThread(string serverString);

protected:
void incomingConnection(int socketDescriptor);

private:
QStringList fortunes;

signals:

void procesServerStringToParent(string serverString);
void getStringToThreadSignal(string serverString);
};


class FortuneThread : public QObject
 {
Q_OBJECT

public:
FortuneThread(int socketDescriptor, QObject *parent);

public slots:

void getString();
void sendString(string sendoutString);

signals:

void error(QTcpSocket::SocketError socketError);
void fromThreadString(string serverString);
void finished();


private:
int socketDescriptor;
QString text;
QTcpSocket tcpSocket;
};

#endif

и копия:

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);

connect(thread, SIGNAL(started()), serverthread, SLOT(getString()));
connect(serverthread, SIGNAL(fromThreadString(string)), this,        SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));

connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent)
: QObject(parent), socketDescriptor(socketDescriptor)
{



}

void FortuneThread::getString()
{

if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    return;
}
//in part
if(!tcpSocket.waitForReadyRead(10000)){

    emit finished();
    return;
}
int joj = tcpSocket.bytesAvailable();
char inbuffer[1024];
tcpSocket.read(inbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{       


//out part
char buffer[1024];
int buffer_len = 1024;
int bytecount;

memset(buffer, '\0', buffer_len);


string outstring = sendoutString;



int TempNumOne= (int)outstring.size();

for (int a=0;a<TempNumOne;a++)
    {
        buffer[a]=outstring[a];
    }

QByteArray block;
block = buffer;



tcpSocket.write(block);
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();
}

это от родителя:

//server start

QHostAddress adr;
adr.setAddress( QString("127.0.0.1") );
adr.toIPv4Address();
quint16 port = 1101;

if (!server.listen( adr, port)) {
  QMessageBox::critical(this, tr("CR_bypasser"),
      tr("Unable to start the server: %1.")
      .arg(server.errorString()));
  close();
  return;
}

QString ipAddress;
ipAddress = server.serverAddress().toString();
statusLabel->setText(tr("The server is running on\n\nIP: %1\nport: %2\n\n"
  "Run the Fortune Client example now.")
  .arg(ipAddress).arg(server.serverPort()));

connect (&server, SIGNAL(procesServerStringToParent(string)), this,  SLOT(procesServerString(string))); 
connect (this, SIGNAL(StringToServer(string)), &server, SLOT(getStringToThread(string))); 

редактировать: что я пытаюсь сделать:

У меня есть клиент (часть игрового движка (Cryengine)), который я сделал для отправки строки игровых координат и некоторых других вещей с сокетом, как это было сделано по ссылке, которую я дал ранее. Это работает хорошо. Я получаю данные через порт "127.0.0.1" 1101. Теперь мне просто нужно, чтобы эти данные были оценены в моей собственной программе, которая имеет этот класс TableView, внутри которого я могу собирать координаты, которые я получаю из строки, вычислять некоторые данные из координат и затем верните эту новую строку обратно через сервер в gameengine. В игре я нажму на объекты, чтобы получить их coor., Сделаю из них строку (содержащую coor, entityid и т. Д.), Отправлю эту строку на сервер, который возвращает информацию о callculated из TableView. Мне просто нужен этот односторонний поток только один клиент, который отправляет строки. Я не уверен насчет recv(hsock, buffer, buffer_len, 0), я думаю, что узел, отвечающий за отправку строки в игре, будет ожидать возврата строки? Это одна из моих первых программ, я действительно запутался...

2 ответа

Решение

Код, который вы представляете, является примером кодирования грузового культа: вы делаете разные ненужные вещи, очевидно, в надежде решить проблему.

Вероятный Крашер...

Есть много проблем с кодом, но я думаю, что причина сбоя заключается в следующем: tcpSocket.write(block) не отправляет строку с нулевым символом в конце. Блок заканчивается нулем, но присвоение байтовому массиву не добавляет это нулевое завершение к size() QByteArray. Следующий код печатает 1, даже если внутри содержимого байтового массива имеется нулевой завершающий байт.

QByteArray arr = "a";
qDebug("len=%d", arr.size());

Получающий код ожидает нулевого завершения, но никогда не получает его. Затем вы переходите к назначению буфера с нулевым символом в конце std::string:

string instring;
instring = inbuffer;
instring.resize(joj);

Последующее изменение размера - культ груза: вы пытаетесь решить проблему после std::string & std::string::operator=(const char*) уже прочитал мимо вашего буфера, по всей вероятности.

Не принимайте это, чтобы означать, что исправление - это правильный путь. Не за что. Правильный способ - удалить написанный вами код и делать это правильно, без множества ненужных заклинаний, которые не помогают.

... и все другие проблемы

Вы попали в ловушку веры в магию, бесконечно увековеченные на разных форумах.

Потоки не являются магическими объектами, которые вы можете просто применить к любой проблеме в надежде, что они помогут. Я не знаю, что заставляет людей думать, что темы волшебны, но эмпирическое правило таково:если кто-то говорит вам: "О, вы должны попробовать темы", они, скорее всего, ошибаются.Если они говорят, что по отношению к сети, они почти никогда не правы, они бесполезны, и они совсем не понимают вашу проблему (кажется, и вы тоже). Чаще всего темы не помогут, если вы четко не поймете свою проблему. Сетевая система Qt является асинхронной: она не блокирует выполнение вашего кода, если вы не используете waitxxxx() функции. Кстати, не стоит их использовать, так что здесь все хорошо. Нет необходимости в миллиарде нитей.

Таким образом, совершенно необязательно запускать новый поток для каждого входящего соединения. Это снизит производительность вашего сервера - особенно если сервер выполняет простую обработку, потому что вы добавляете накладные расходы на переключение контекста и создание / демонтаж потока для каждого соединения. Вам нужно менее 2 потоков на каждое ядро ​​в вашей системе, поэтому используйте QThread::idealThreadCount() для количества потоков в пуле будет хорошей отправной точкой.

Вы также лишаете себя возможности многопоточности, поскольку вы используете сетевой поток только для получения данных, а затем отправляете fromThreadString(string) сигнал. Я предполагаю, что сигнал отправляется в основной поток вашего приложения. Теперь это просто глупо, потому что получение множества байтов из сетевого сокета совершенно тривиально. Ваши темы не выполняют никакой работы, вся работа, которую они выполняют, тратится на их создание и удаление.

Приведенный ниже код является простым примером того, как можно правильно использовать API-интерфейсы Qt для реализации системы клиент-сервер, которая распределяет работу по физическим ядрам в циклическом порядке. Это должно работать довольно хорошо. Пример клиента Fortune, включенный в Qt, действительно очень неудачен, потому что это совершенно неправильный путь.

Что можно заметить, так это:

  1. Это не совсем тривиально. Qt может быть более полезным, но это не так.

  2. И клиенты, и отправители перемещаются в потоки из пула потоков.

  3. Отключенные клиенты не удаляются, а просто возвращаются в список клиентов, которые хранятся в пуле протекторов. Они используются повторно, когда к клиенту обращаются.

  4. QThread не является производным от. QTcpServer создается только для доступа к дескриптору сокета.

  5. Нет функций, название которых начинается с wait() используются. Все обрабатывается асинхронно.

  6. ThreadPool хранит искомый QMetaMethod для newConnection(int) слот клиента. Это быстрее чем использование QMetaObject::invokeMethod() поскольку это должно искать вещи каждый раз.

  7. Таймер, работающий в главном потоке, запускает цепочку сигнальных интервалов, удаляя первого отправителя. Удаление каждого отправителя вызывает удаление следующего. В конце концов, последний отправитель quit() слот в пуле потоков. Последний испускает finished() сигнал, когда все потоки действительно закончены.

#include <QtCore/QCoreApplication>
#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>
#include <QtCore/QQueue>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include <QtCore/QMetaMethod>

// Processes data on a socket connection
class Client : public QObject
{
    Q_OBJECT
public:
    Client(QObject* parent = 0) : QObject(parent), socket(new QTcpSocket(this))
    {
        connect(socket, SIGNAL(readyRead()), SLOT(newData()));
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        qDebug("Client()");
    }
    ~Client() { qDebug("~Client()"); }
signals:
    void done();
public slots:
    void newConnection(int descriptor) {
        socket->setSocketDescriptor(descriptor);
    }
private slots:
    void newData() {
        QByteArray data = socket->readAll();
        if (0) qDebug("got %d bytes", data.size());
        if (0) qDebug("got a string %s", data.constData());
        // here we can process the data
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("client new state %d", state);
        if (state == QAbstractSocket::UnconnectedState) { emit done(); }
    }
protected:
    QTcpSocket* socket;
    int descriptor;
};

// Connects to a client and sends data to it
class Sender : public QObject
{
    Q_OBJECT
public:
    Sender(const QString & address, quint16 port, QObject * parent = 0) :
        QObject(parent), socket(new QTcpSocket(this)),
        bytesInFlight(0), maxBytesInFlight(65536*8)
    {
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        connect(socket, SIGNAL(bytesWritten(qint64)), SLOT(sentData(qint64)));
        socket->connectToHost(address, port);
        qDebug("Sender()");
    }
    ~Sender() { qDebug("~Sender()"); }
protected:
    // sends enough data to keep a maximum number of bytes in flight
    void sendData() {
        qint64 n = maxBytesInFlight - bytesInFlight;
        if (n <= 0) return;
        bytesInFlight += n;
        socket->write(QByteArray(n, 44)); // 44 is the answer, after all
    }
protected slots:
    void sentData(qint64 n) {
        bytesInFlight -= n;
        Q_ASSERT(bytesInFlight >= 0);
        sendData();
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("sender new state %d", state);
        if (state == QAbstractSocket::ConnectedState) sendData();
    }
protected:
    QTcpSocket* socket;
    qint64 bytesInFlight;
    qint64 maxBytesInFlight;
};

// Keeps track of threads and client objects
class ThreadPool : public QTcpServer
{
    Q_OBJECT
public:
    ThreadPool(QObject* parent = 0) : QTcpServer(parent), nextThread(0) {
        for (int i=0; i < QThread::idealThreadCount(); ++i) {
            QThread * thread = new QThread(this);
            connect(thread, SIGNAL(finished()), SLOT(threadDone()));
            thread->start();
            threads << thread;
        }
        const QMetaObject & mo = Client::staticMetaObject;
        int idx = mo.indexOfMethod("newConnection(int)");
        Q_ASSERT(idx>=0);
        method = mo.method(idx);
    }
    void poolObject(QObject* obj) const {
        if (nextThread >= threads.count()) nextThread = 0;
        QThread* thread = threads.at(nextThread);
        obj->moveToThread(thread);
    }
protected:
    void incomingConnection(int descriptor) {
        Client * client;
        if (threads.isEmpty()) return;
        if (! clients.isEmpty()) {
            client = clients.dequeue();
        } else {
            client = new Client();
            connect(client, SIGNAL(done()), SLOT(clientDone()));
        }
        poolObject(client);
        method.invoke(client, Q_ARG(int, descriptor));
    }
signals:
    void finished();
public slots:
    void quit() {
        foreach (QThread * thread, threads) { thread->quit(); }
    }
private slots:
    void clientDone() {
        clients.removeAll(qobject_cast<Client*>(sender()));
    }
    void threadDone() {
        QThread * thread = qobject_cast<QThread*>(sender());
        if (threads.removeAll(thread)) delete thread;
        if (threads.isEmpty()) emit finished();
    }
private:
    QList<QThread*> threads;
    QQueue<Client*> clients;
    QMetaMethod method;
    mutable int nextThread;
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    ThreadPool server;
    if (!server.listen(QHostAddress::Any, 1101)) qCritical("cannot establish a listening server");
    const int senderCount = 10;
    Sender *prevSender = 0, *firstSender = 0;
    for (int i = 0; i < senderCount; ++ i) {
        Sender * sender = new Sender("localhost", server.serverPort());
        server.poolObject(sender);
        if (!firstSender) firstSender = sender;
        if (prevSender) sender->connect(prevSender, SIGNAL(destroyed()), SLOT(deleteLater()));
        prevSender = sender;
    }
    QTimer::singleShot(3000, firstSender, SLOT(deleteLater())); // run for 3s
    server.connect(prevSender, SIGNAL(destroyed()), SLOT(quit()));
    qApp->connect(&server, SIGNAL(finished()), SLOT(quit()));
    // Deletion chain: timeout deletes first sender, then subsequent senders are deleted,
    // finally the last sender tells the thread pool to quit. Finally, the thread pool
    // quits the application.
    return a.exec();
}

#include "main.moc"

Исходя из вашего объяснения, ваш игровой движок запускается и создает соединение с каким-либо портом на localhost. Предполагается, что ваша программа Qt принимает это соединение через порт 1101, получает некоторые строки, обрабатывает их, а затем отправляет обратно.

Код изменен, чтобы принять соединение по фиксированному номеру порта. Вся обработка данных, включая отправку ответа, должна быть выполнена из newData() слот. Вы также можете передать эти данные в другой поток, если ваши вычисления очень сложны. Под сложным я подразумеваю десятки тысяч операций, таких как сложения и умножения, или тысячи триггерных операций.

Sender класс там просто в качестве примера. Конечно, ваш игровой движок выполняет отправку, поэтому вам не нужен класс Sender.

Я получил свой старый код "неправильный способ сделать это", чтобы работать. Я думаю, что эта часть была там, где была ошибка:

//removed
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();

...

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);




connect(serverthread, SIGNAL(fromThreadString(string)), this, SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));
connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent): QObject(parent),   socketDescriptor(socketDescriptor)

{
if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    emit finished();
    return;
}




connect(&tcpSocket, SIGNAL(readyRead()), this, SLOT(getString()));
//connect(&tcpSocket, SIGNAL(disconnected()), this, SLOT(ondisconnected()));

}

void FortuneThread::getString()
{

int joj = tcpSocket.bytesAvailable();
if(joj==0){
    tcpSocket.disconnectFromHost();
    emit finished();
    return;
}
char inbuffer[1024];
int buffer_len = 1024;
memset(inbuffer, '\0', buffer_len);
tcpSocket.read(inbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{   
char buffer2[1024];
int buffer_len = 1024;
memset(buffer2, '\0', buffer_len);
strcat(buffer2,sendoutString.c_str());

tcpSocket.write(buffer2,buffer_len);

 }

void FortuneThread::ondisconnected()
{


emit finished();


}
Другие вопросы по тегам