Как сжимать вызовы слотов при использовании соединения в очереди в Qt?

После прочтения некоторых подобных статей о связи Qt Signal-Slot у меня все еще остается вопрос, касающийся соединения в очереди.

Если у меня есть несколько потоков, посылающих сигналы друг другу все время и скажем один thread_slowработает медленный метод в своем цикле событий и другой thread_fast работает быстрый, который отправляет несколько сигналов, в то время как другой поток все еще работает, это медленный метод..... когда медленный метод из thread_slow возвращается в цикл обработки событий, будет ли он обрабатывать все сигналы, которые были отправлены ранее thread_fastили только последний (все сигналы одного типа)?

Если он будет обрабатывать все сигналы, есть ли способ сделать thread_slow только обработать последний? (Учитывая, что "последний" в многопоточном приложении может быть расплывчатым, давайте рассмотрим последний сигнал перед тем, как поток запросил последний сигнал, для простоты, поэтому новые, отправляемые, пока поток ищет последний, могут быть потеряны).

(Я спрашиваю об этом, потому что у меня есть несколько потоков, получающих данные из нескольких потоков, и я не хочу, чтобы они обрабатывали старые данные, только последние, которые были отправлены)

Я провел несколько тестов, и похоже, что Qt обработает все сигналы. Я заставил одну нить сделать:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

а слот в другой сделаю:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

и поток будет работать:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

Я посылаю сигнал из одного потока в другой каждые 500 миллисекунд, а другой поток спит в течение 3000 миллисекунд (3 секунды), а затем просыпается и увеличивает внутреннюю переменную на 100. каждый раз, когда выполняется слот, он выдает текст с полученное значение + внутренняя переменная. Результат, который я получаю, заключается в том, что каждый раз QCoreApplication::processEvents(); вызывается, все сигналы выполняются....(Я редактировал эту часть, потому что я нашел ошибку в моем предыдущем коде)

7 ответов

Решение

Я пытаюсь превратить мой комментарий в ответ. Я согласен с вами в том, что в документации не хватает этой информации, или, по крайней мере, она неясна для меня и, видимо, для вас тоже.

Было бы два варианта, чтобы получить больше информации:

1) пробная версия

Поместите оператор qDebug() или printf()/fprintf() в ваш слот в "медленном" потоке и посмотрите, что он печатает. Запустите это несколько раз и сделайте вывод.

2) Убедиться

Вам нужно прочитать исходный код для этого, как мета-объектный компилятор, иначе. moc получает это из исходного файла. Это более сложное расследование, но это может привести к определенности.

Насколько я знаю, каждому сигналу эмиссии выкладывается соответствующее событие. Затем событие будет поставлено в очередь для отдельного потока в классе потока. Здесь вы можете найти соответствующие два файла исходного кода:

void QCoreApplication:: postEvent (получатель QObject *, событие QEvent *, приоритет int)

а также

Класс QPostEventList: общедоступный QVector

Есть два подхода с их компромиссами:

Поставить в очередь занятый слот из слота мутатора данных

Основным преимуществом является то, что сигналы не могут быть потеряны во время операции занятости. Однако это может быть медленнее по своей природе, поскольку потенциально может обрабатывать гораздо больше операций, чем необходимо.

Идея состоит в том, что данные переустанавливаются для каждого обработанного события, но реальная занятая операция ставится в очередь для выполнения только один раз. Это не обязательно должно быть для первого события, если их больше, но это самая простая реализация.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

Connect / Disconnect

Идея состоит в том, чтобы отключить слот от соответствующего сигнала при запуске обработки. Это будет гарантировать, что новое излучение сигнала не будет перехвачено, и снова подключит слот к сигналу, как только поток освободится для обработки следующих событий.

Это могло бы иметь некоторое время простоя в потоке, хотя между соединением и следующим даже обрабатывалось, но по крайней мере это было бы простым способом реализовать его. Это может быть даже незначительной разницей в производительности в зависимости от большего контекста, который здесь не представлен.

Основным недостатком является то, что это потеряло бы сигналы во время занятой операции.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

Будущие мысли

Я думал о том, существует ли удобный API - например, метод processEvents(), но с аргументом для обработки только последнего опубликованного события - для того, чтобы фактически сказать системе событий явно обработать последнее событие, а не обходить саму проблему. Похоже, что это такой API, но он частный.

Возможно, кто-то отправит запрос на добавление функции, чтобы опубликовать что-то подобное.

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

Соответствующий исходный код можно найти здесь.

Это также, кажется, имеет переопределенную версию в QGuiApplication а также QApplication,

Что касается полноты, существует также такой метод, как этот:

void QCoreApplication:: removePostedEvents (QObject * receive, int eventType = 0) [статические]

Удаляет все события данного eventType, которые были опубликованы с использованием postEvent() для получателя.

События не отправляются, а удаляются из очереди. Вам никогда не нужно вызывать эту функцию. Если вы вызываете его, помните, что уничтожение событий может привести к тому, что получатель сломает один или несколько инвариантов.

Если получатель имеет значение null, события eventType удаляются для всех объектов. Если eventType равен 0, все события удаляются для получателя. Никогда не вызывайте эту функцию с eventType, равным 0. Если вы действительно вызываете ее таким образом, помните, что уничтожение событий может привести к тому, что получатель сломает один или несколько инвариантов.

Но это не совсем то, что вы хотели бы иметь здесь в соответствии с документацией.

QCoreApplication QMetaCallEvent Сжатие

Каждый вызов слота в очереди заканчивается публикацией QMetaCallEvent к целевому объекту. Событие содержит объект отправителя, идентификатор сигнала, индекс слота и параметры упакованного вызова. На Qt 5 идентификатор сигнала обычно не равен значению, возвращаемому QMetaObject::signalIndex(): это индекс, вычисляемый так, как если бы у объекта были только методы сигнала и никаких других методов.

Цель состоит в том, чтобы сжать такие вызовы, чтобы в очереди событий существовал только один уникальный вызов для данного набора (объект отправителя, сигнал отправителя, объект получателя, слот получателя).

Это единственный разумный способ сделать это, не внося изменений в исходные или целевые объекты и сохраняя при этом минимальные накладные расходы. Методы циклического повторения событий в других моих ответах имеют серьезные издержки стека на каждое событие, порядка 1 Кбайт, когда Qt создается для 64-битных архитектур с указателями.

Доступ к очереди событий можно получить, когда новые события публикуются на объекте, в котором уже есть одно или несколько событий. В таком случае, QCoreApplication::postEvent звонки QCoreApplication::compressEvent, compressEvent не вызывается, когда первое событие публикуется для объекта. При повторной реализации этого метода, содержание QMetaCallEvent отправленный на целевой объект может быть проверен на предмет вызова в ваш слот, а устаревший дубликат должен быть удален. Частные заголовки Qt должны быть включены для получения определений QMetaCallEvent, QPostEvent а также QPostEventList,

Плюсы: ни отправитель, ни получатель не должны ничего знать. Сигналы и слоты работают как есть, включая вызовы указателей на методы в Qt 5. Сам Qt использует этот способ сжатия событий.

Минусы: требует включения частных заголовков Qt и принудительной очистки QEvent::posted флаг.

Вместо взлома QEvent::posted Отметить, что события, подлежащие удалению, могут быть поставлены в очередь в отдельном списке и удалены вне compressEvent вызов, когда таймер нулевой длительности срабатывает. Это накладные расходы на дополнительный список событий, и каждое удаление события повторяется в опубликованном списке событий.

Другие подходы

Смысл делать это другим способом - не использовать внутренности Qt.

L1 Первое ограничение - отсутствие доступа к содержимому частного QMetaCallEvent, С этим можно справиться следующим образом:

  1. Прокси-объект с сигналами и слотами тех же подписей, что и у цели, может быть подключен между исходным и целевым объектами.

  2. Запуск QMetaCallEvent на объекте прокси позволяет извлекать тип вызова, идентификатор вызываемого слота и аргументы.

  3. Вместо соединений с сигнальными слотами события могут быть явно отправлены целевому объекту. Целевой объект или фильтр событий должен явно повторно синтезировать вызов слота из данных события.

  4. Обычай compressedConnect реализация может быть использована вместо QObject::connect, Это полностью раскрывает детали сигнала и слота. Прокси-объект может использоваться для выполнения сжатого эквивалента queued_activate на стороне объекта отправителя.

L2 Второе ограничение - невозможность полностью переопределить QCoreApplication::compressEvent, так как список событий определяется в частном порядке. У нас все еще есть доступ к сжатому событию, и мы все еще можем решить, удалять его или нет, но нет способа перебрать список событий. Таким образом:

  1. Доступ к очереди событий можно получить неявным образом путем рекурсивного вызова sendPostedEvents изнутри notify (таким образом, также из eventFilter(), event() или из слотов). Это не вызывает тупик, так как QCoreApplication::sendPostedEvents не может (и не держит) мьютекс цикла событий, пока событие доставляется через sendEvent, События могут быть отфильтрованы следующим образом:

    • глобально в переопределенной QCoreApplication::notify,
    • глобально, зарегистрировав QInternal::EventNotifyCallback,
    • локально, прикрепив фильтр событий к объектам,
    • явно локально путем переопределения QObject::event() в целевом классе.

    Дублированные события все еще отправляются в очередь событий. Рекурсивные вызовы notify изнутри sendPostedEvents потреблять довольно много места в стеке (бюджет 1 КБ на 64-битных архитектурах с указателями).

  2. Уже существующие события могут быть удалены с помощью вызова QCoreApplication::removePostedEvents перед публикацией нового события для объекта. К сожалению, делать это в течение QCoreApplication::compressEvent вызывает взаимоблокировку, так как мьютекс очереди событий уже удерживается.

    Пользовательский класс событий, который включает указатель на объект-получатель, может автоматически вызывать removePostedEvents в конструкторе.

  3. Существующие сжатые события, такие как QEvent::Exit может быть переназначен.

    Набор этих событий является деталью реализации и может измениться. Qt не делает различий между этими событиями, кроме как получателем QObject указатель. Реализация требует накладных расходов прокси-объекта QObject для каждого кортежа (тип события, объект-получатель).

Реализация

Код ниже работает как на Qt 4, так и на Qt 5. В последнем случае обязательно добавьте QT += core-private в ваш файл проекта qmake, так что частные заголовки Qt будут включены.

Реализации, не использующие внутренние заголовки Qt, приведены в других ответах:

Существует два пути кода для удаления событий, выбранные if (true), Включенный путь кода сохраняет самое последнее событие и, как правило, имеет смысл. В качестве альтернативы, вы можете захотеть сохранить самое старое событие - это то, что делает путь отключенного кода.

Скриншот

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"

Это другой подход. Не требует никаких изменений в отправителя и получателя объектов, но требует пользовательских CompressorProxy объект. Это переносимо как в Qt 4, так и в Qt 5 и не требует доступа к внутренностям Qt.

Объект компрессора должен быть дочерним по отношению к целевому объекту - объекту со слотами. Таким образом, он отслеживает поток целевого объекта. Так как сигналы компрессора присоединяются к слотам цели, когда они находятся в одном потоке, нет никаких затрат на соединения в очереди для вызовов целевых слотов.

Волшебство происходит в emitCheck метод: он вызывает себя рекурсивно.

  1. Слот-вызов заканчивается emitCheck,
  2. Дальнейшие опубликованные события отправляются по телефону sendPostedEvents,
  3. Если в очереди событий есть повторяющиеся вызовы слотов, они в конечном итоге emitCheck снова.
  4. Как только последнее событие в очереди выбрано, и sendPostedEvents больше не повторяется, флаг сбрасывается для данного слота, так что его прокси-сигнал не будет излучаться более одного раза. Это желаемое поведение сжатия.

Для любого заданного набора вызовов слотов в очереди к экземпляру CompressorProxy, emitCheck вернусь true только один раз для слота, который вызывался несколько раз при прохождении списка опубликованных событий.

Обратите внимание, что использование стека на рекурсивный вызов в режиме выпуска составляет около 600 байтов на 32-битных архитектурах и вдвое больше, чем на 64-битных архитектурах. В режиме отладки на OS X, использующем 64-битную сборку, использование стека на рекурсию составляет ~4 КБ.

Скриншот

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

class CompressorProxy : public QObject {
    Q_OBJECT
    bool emitCheck(bool & flag) {
        flag = true;
        QCoreApplication::sendPostedEvents(this, QEvent::MetaCall); // recurse
        bool result = flag;
        flag = false;
        return result;
    }

    bool m_slot;
    Q_SLOT void slot() {
        if (emitCheck(m_slot)) emit signal();
    }
    Q_SIGNAL void signal();

    bool m_slot_int;
    Q_SLOT void slot_int(int arg1) {
        if (emitCheck(m_slot_int)) emit signal_int(arg1);
    }
    Q_SIGNAL void signal_int(int);
public:
    // No default constructor, since the proxy must be a child of the
    // target object.
    explicit CompressorProxy(QObject * parent) : QObject(parent) {}
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        CompressorProxy * proxy = new CompressorProxy(this);
        connect(&m_signaller, SIGNAL(emptySignal()), proxy, SLOT(slot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), proxy, SLOT(slot_int(int)), Qt::QueuedConnection);
        connect(proxy, SIGNAL(signal()), this, SLOT(emptySlot()));
        connect(proxy, SIGNAL(signal_int(int)), this, SLOT(dataSlot(int)));
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"

Это еще один подход, переносимый как на Qt 4, так и на Qt 5, и не требующий доступа к внутренним компонентам Qt (кроме того, что доступно через публичные заголовки). На Qt 5 поддерживаются только соединения в стиле Qt 4. Сжатые объекты - это пары (объект-получатель, слот). Это отличается от (отправитель, получатель, сигнал, слот) кортежа, используемого, когда у каждого есть полный доступ к QMetaCallEvent,

Он использует QObject::qt_metacall выхватить детали звонка из черного ящика QMetaCallEvent, Рекурсия в sendPostedEvents используется, как и в моем другом ответе без внутренних данных.

Стоит отметить, что QObject::qt_metacallAPI остался неизменным, по крайней мере, с Qt 4.0.

Скриншот

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>
#include <QSet>
#include <QMetaMethod>

// Common Code

/*! Keeps a list of method indices for one or more meatobject classes. */
class MethodList {
    Q_DISABLE_COPY(MethodList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
public:
    MethodList() {}
    template <class T> void add(const char * slot) {
        add(T::staticMetaObject.method(T::staticMetaObject.indexOfSlot(slot)));
    }
    void add(const QMetaMethod & method) {
        Q_ASSERT(method.methodIndex() >= 0);
        m_data[method.enclosingMetaObject()].insert(method.methodIndex());
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(method.methodIndex());
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int methodId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(methodId);
    }
};
Q_GLOBAL_STATIC(MethodList, compressedSlots)

// Compressor

class Compressor : public QObject {
    enum { Idle, Armed, Valid } m_state;
    QMetaObject::Call m_call;
    int m_methodIndex;
    QSet<int> m_armed; // armed method IDs

    int qt_metacall(QMetaObject::Call call, int id, void ** args) {
        if (m_state != Armed) return QObject::qt_metacall(call, id, args);
        m_state = Valid;
        m_call = call;
        m_methodIndex = id;
        return 0;
    }
    bool eventFilter(QObject * target, QEvent * ev) {
        Q_ASSERT(target == parent());
        if (ev->type() == QEvent::MetaCall) {
            m_state = Armed;
            if (QT_VERSION < QT_VERSION_CHECK(5,0,0) || ! *(void**)(ev+1)) {
                // On Qt5, we ensure null QMetaCallEvent::slotObj_ since we can't handle Qt5-style member pointer calls
                Compressor::event(ev); // Use QObject::event() and qt_metacall to extract metacall data
            }
            if (m_state == Armed) m_state = Idle;
            // Only intercept compressed slot calls
            if (m_state != Valid || m_call != QMetaObject::InvokeMetaMethod ||
                    ! compressedSlots()->contains(target->metaObject(), m_methodIndex)) return false;
            int methodIndex = m_methodIndex;
            m_armed.insert(methodIndex);
            QCoreApplication::sendPostedEvents(target, QEvent::MetaCall); // recurse
            if (! m_armed.contains(methodIndex)) return true; // Compress the call
            m_armed.remove(methodIndex);
        }
        return false;
    }
public:
    Compressor(QObject * parent) : QObject(parent), m_state(Idle) {
        parent->installEventFilter(this);
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    compressedSlots()->add<Widget>("emptySlot()");
    compressedSlots()->add<Widget>("dataSlot(int)");
    Widget w;
    new Compressor(&w);
    w.show();
    return a.exec();
}

#include "main.moc"
thread_slow 

обработает все сигналы, отправленные в своем цикле событий, если вы использовали подключение к очереди или postEvent

Источник:

Соединение в очереди Слот вызывается, когда управление возвращается в цикл обработки событий получателя. Слот выполняется в потоке получателя.

QtDoc

Если вы хотите узнать больше о том, как обрабатывается событие, вы можете посмотреть здесь:

https://qt.gitorious.org/qt/qtbase/source/631c3dbc800bb9b2e3b227c0a09523f0f7eef0b7:src/corelib/thread/qthread_p.h#L127

Как вы можете видеть, события сортируются в порядке приоритетов, поэтому, если все ваши события имеют одинаковый приоритет, он идет первым в очереди.

Это не тривиальная задача, здесь грубая попытка, скажите мне, если это работает.

То, что я предлагаю, это в основном хранить события самостоятельно и обрабатывать только последнее.

thread_slow.h

int current_val;
bool m_isRunning;

thread_slow.cpp

void enqueue_slot( int val /*or whatever you value is*/ ) {
     // You'll enventually need a a QMutex here if your slot is not call in the thread
     m_current_val = val;
     if( !m_isRunning )
         slowRun();
}

void checkHasReceivedEventSlot() {
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

void slowRun() {
    m_isRunning = true;
    int v = m_current_val;
    m_current_val = -1; // Invalid value

   // Do stuff with v

   // Let the queue fill itself with enqueue_slot calls
   QTimer::singleShot(kTIMEOUT, this, SLOT(checkHasReceivedEventSlot()));
}

При первом вызове enqueue_slot начнется медленный запуск

РЕДАКТИРОВАТЬ:

Чтобы убедиться, что это последнее событие, вы можете сделать следующее:

void checkHasReceivedEventSlot() {
    // Runs enqueue_slot until no more events are in the loop
    while( m_thread->eventDispatcher()->hasPendingEvents() )
         m_thread->eventDispatcher()->processEvents(QEventLoop::AllEvents);

    // m_current_val should hold the last event
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

Из вопроса: "Если он обработает все сигналы, есть ли способ заставить thread_slow обрабатывать только последний?"

Если вы просто хотите всегда обрабатывать последний сигнал и не возражаете, если обрабатывается несколько дополнительных сигналов, если это не замедляет работу, вы можете попробовать очень простой подход, например, используя обычный QThread::exec() цикл событий. Поместите эти методы слота в QObject подкласс, который вы затем перемещаете в поток:

//slot
void MyClass::publicReceiverSlotForQueuedSignals(QString data)
{
    // Update data every time
    mReceivedData = data;

    // Allow worker method to be queued just once
    if (!mWorkerSlotInvoked) {
        mWorkerSlotInvoked = true;
        QMetaObject::invokeMethod(this, "workerSlot", Qt::QueuedConnection);
        qDebug() << "publicReceiverSlotForQueuedSignals: invoked workerSlot!"
                 << "New data:" << mReceivedData;
    } else {
        qDebug() << "publicReceiverSlotForQueuedSignals: workerSlot already invoked."
                 << "New data:" << mReceivedData;
    }
}

//slot
void MyClass::privateWorkerSlot()
{
    mWorkerSlotInvoked = false;
    qDebug() << "workerSlot for data:" << mReceivedData;
    QThread::msleep(3000);
    qDebug() << "workerSlot returning.";
}

publicReceiverSlotForQueuedSignals проходит очень быстро (qDebug в else это, вероятно, самая трудоемкая часть для быстрых вызовов), поэтому не имеет значения, сколько сигналов находится в очереди. А потом privateWorkerSlot будет вызываться только по одному на каждое событие цикла вращения этого потока, независимо от того, насколько медленно он идет.

Также было бы тривиально добавить мьютекс для защиты mReceivedData а также mWorkerSlotInvoked в обоих методах слотов (и везде вы можете использовать их). Тогда вы можете сделать прямое подключение к слоту, потому что invokeMethod является потокобезопасным, и мьютекс сделает обработку личных данных членов MyClass Поток безопасен, а также. Просто убедитесь, что вы скопировали содержимое mReceivedData в локальную переменную и разблокируйте мьютекс, прежде чем выполнять его трудоемкую обработку.

Примечание: непроверенный код, вероятно, содержит несколько ошибок.

Вы можете использовать комбинацию DirectConnection и QueueConnection:

  1. На вашей рабочей стороне (thread_slow):

    • Открытый слот, который должен вызываться вашим провайдером задач (thread_fast)

      void Working::process()
      {
         if (working)
         {
           printf("Drop a task %p\n", QThread::currentThread()); 
           return;
         }
      
        working = true;        
        emit realWork();
      }
      
    • Функция обработки (это медленно): realProcess()

      void Working::realProcess()
      {
          printf("      **** Working ... %p\n",QThread::currentThread()); fflush(stdout);
      
          // Emulate a big processing ...
          usleep(3000*1000);
      
          printf("      **** Done. %p\n",QThread::currentThread());fflush(stdout);
          working = false;
          emit done();
      }
      
    • QueueConnection от realWork в realProcess

      Working::Working()
      {
          working = false;
          connect(this,SIGNAL(realWork()),this,SLOT(realProcess()),Qt::QueuedConnection);
      }
      
  2. На вашей стороне задачи поставщика (thread_fast)

    • startWork() сигнал

      void TaskProv::orderWork()
      {
          emit startWork();
      }
      
    • Прямое соединение со слотом рабочего процесса

      QObject::connect(&taskProvider,SIGNAL(startWork()),&worker,SLOT(process()),Qt::DirectConnection);
      

Некоторые заметки:

  • Функция Working::process() будет запущен в thread_fast (даже если это рабочая функция-член), но она просто проверяет наличие флага, поэтому она не должна влиять на время обработки

  • Если вы возражаете против возможного дополнительного сброса задачи, вы можете защитить рабочий флаг рабочего с помощью мьютекса для более жесткого управления.

  • Это очень похоже на команду lpapp "Очередь занятого слота из слота мутатора данных", за исключением того, что тип соединения должен быть правильной комбинацией Direct и Queue.

Как примечание для ответа @kuba-ober - мне пришлось обновить их compressEvent(...) обработчик, чтобы проверить, что mce->sender() != nullptr до звонка m_compressedSignals.contains(...) в противном случае мой код будет segfault. Я не уверен, почему это происходит, но я также не пытаюсь сжать все события, только несколько в моей системе.

обновленный код выглядит

// Added code:
if (mce->sender() == nullptr) {
  return Base::compressEvent(event, receiver, postedEvents);
}
// end added code
if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
Другие вопросы по тегам