Уведомление переменной условия пропущено

На данный момент я пишу какой-то шаблон Fork/Join с использованием std::threads. Поэтому я написал класс-оболочку для std:: thread, который использует счетчик ссылок для всех детей.

Всякий раз, когда дочерний процесс завершает свое выполнение, счетчик ссылок уменьшается и уведомление отправляется всем ожидающим потокам. Ожидающие потоки ожидают, когда счетчик ссылок станет 0, что означает, что все дочерние потоки завершили свое выполнение.

К сожалению, иногда кажется, что уведомление пропускается. Я отладил программу, используя gdb, который показал мне, что счетчик ссылок в самом глубоком потоке блокировки на самом деле уже равен 0, но он его не распознал.

Класс называется ThreadAttachment:

/**
 * \brief For the \p ThreadScheduler the attachment object is a thread itself since for each task a single thread is created.
 *
 * Children management is required for the fork/join model. It is realized by using an atomic reference counter.
 * The reference counter is initially set or changed dynamically by threadsafe operations.
 * It is decreased automatically whenever a child task finishes its execution.
 */
class ThreadAttachment : public Attachment
{
    public:
        /**
         * Creates a new thread attachment without creating the actual thread nor starting it.
         * \param task The thread attachment is created for the corresponding task \p task.
         */
        ThreadAttachment(Task *task);
        virtual ~ThreadAttachment();

        /**
         * Sets the counter of the child tasks.
         * \note Threadsafe.
         */
        void setChildCount (int count);
        /**
         * Increments the counter of the child tasks by one.
         * \note Threadsafe.
         */
        void incrementChildCount();
        /**
         * Decrements the counter of the child tasks by one.
         *
         * Besides it notifies \ref m_childrenConditionVariable for all threads which means that all threads which are calling \ref joinChildren() are being awakened.
         * \note Threadsafe.
         */
        void decrementChildCount();
        /**
         * \return Returns the counter of the child tasks.
         * \note Threadsafe.
         */
        int childCount();
        /**
         * Joins all added children thread attachments.
         * Waits for notifications of \ref m_childrenConditionVariable if the counter of child tasks is not already 0.
         * Checks on each notification for the counter to become 0. If the counter is finally 0 it stops blocking and continues the execution.
         */
        void joinChildren();

        /**
         * Allocates the actualy std::thread instance which also starts the thread immdiately.
         * The thread executes the corresponding task safely when executed itself by the operating systems thread scheduler.
         * \note This method should only be called once.
         */
        void start();

        /**
         * Joins the previously with \ref start() allocated and started std::thread.
                 * If the std::thread is already done it continues immediately.
         */
        void join();

        /**
         * Detaches the previously with \ref start() allocated and started std::thread.
         * This releases the thread as well as any control.
         */
        void detach();

    private:
        /**
         * The thread is created in \ref start().
         * It must be started after all attachment properties have been set properly.
         */
        std::unique_ptr<std::thread> m_thread;
        /**
         * This mutex protects concurrent operations on \ref m_thread.
         */
        std::mutex m_threadMutex;
        /**
         * A reference counter for all existing child threads.
         * If this value is 0 the thread does not have any children.
         */
        std::atomic_int m_childrenCounter;
        /**
         * This mutex is used for the condition variable \ref m_childrenConditionVariable when waiting for a notification.
         */
        std::mutex m_childrenConditionVariableMutex;
        /**
         * This condition variable is used to signal this thread whenever one of his children finishes and its children counter is decreased.
         * Using this variable it can wait in \ref join() for something to happen.
         */
        std::condition_variable m_childrenConditionVariable;
};

Метод start() запускает поток:

void ThreadAttachment::start()
{
    /*
     * Use one single attachment object only once for one single task.
     * Do not recycle it to prevent confusion.
     */
    assert(this->m_thread.get() == nullptr);
    ThreadAttachment *attachment = this;

    /*
     * Lock the mutex to avoid data races on writing the unique pointer of the thread which is not threadsafe itself.
     * When the created thread runs it can write data to itself safely.
     * It is okay to lock the mutex in the method start() since the creation of the thread does not block.
     * It immediately returns to the method start() in the current thread.
     */
    std::mutex &mutex = this->m_threadMutex;
    {
        std::lock_guard<std::mutex> lock(mutex);

        /*
         * The attachment should stay valid until the task itself is destroyed.
         * So it can be passed safely.
         *
         * http://stackru.com/a/7408135/1221159
         *
         * Since this call does not block and the thread's function is run concurrently the mutex will be unlocked and then the thread can acquire it.
         */
        this->m_thread.reset(new std::thread([attachment, &mutex]()
        {
            /*
             * Synchronize with the thread's creation.
             * This lock will be acquired after the method start() finished creating the thread.
             * It is used as simple barrier but should not be hold for any time.
             * Otherwise potential deadlocks might occur if multiple locks are being hold especially in decreaseParentsChildrenCounter()
             */
            {
                std::lock_guard<std::mutex> lock(mutex);
            }

            attachment->correspondingTask()->executeSafely();

            /*
             * After spawning and joining in the task's logic there should be no more children left.
             */
            assert(attachment->childCount() == 0);

            /*
             * Finally the children counter of the parent task has to be decreased.
             * This has to be done by the scheduler since it is a critical area (access of the different attachments) and therefore must be locked.
             */
            ThreadScheduler *scheduler = dynamic_cast<ThreadScheduler*>(attachment->correspondingTask()->scheduler());
            assert(scheduler);
            scheduler->decreaseParentsChildrenCounter(attachment);
        }));
    }
}

Это метод lowerParentsChildrenCounter() класса ThreadScheduler:

void ThreadScheduler::decreaseParentsChildrenCounter(ThreadAttachment *attachment)
{
    {
        std::lock_guard<std::mutex> lock(this->m_mutex);

        Task *child = attachment->correspondingTask();

        assert(child != nullptr);

        Task *parent = child->parent();

        if (parent != nullptr)
        {
            Attachment *parentAttachment = this->attachment(parent);
            assert(parentAttachment);
            ThreadAttachment *parentThreadAttachment = dynamic_cast<ThreadAttachment*>(parentAttachment);
            assert(parentThreadAttachment);
            /*
             * The parent's children counter must still be greater than 0 since this child is still missing.
             */
            assert(parentThreadAttachment->childCount() > 0);
            parentThreadAttachment->decrementChildCount();
        }
    }
}

Он в основном вызывает decmentChildCount() для родительского потока.

Метод joinChildren() ожидает завершения работы всех детей:

void ThreadAttachment::joinChildren()
{
    /*
     * Since the condition variable is notified each time the children counter is decremented
     * it will always awake the wait call.
     * Otherwise the predicate check will make sure that the parent thread continues work.
     */
    std::unique_lock<std::mutex> l(this->m_childrenConditionVariableMutex);
    this->m_childrenConditionVariable.wait(l,
        [this]
        {
            /*
             * When the children counter reached 0 no more children are executing and the parent can continue its work.
             */
            return this->childCount() == 0;
        }
    );
}

Это операции атомарного счетчика, и, как вы можете видеть, я отправляю уведомление всякий раз, когда значение уменьшается:

void ThreadAttachment::setChildCount(int counter)
{
    this->m_childrenCounter = counter;
}

void ThreadAttachment::incrementChildCount()
{
    this->m_childrenCounter++;
}

void ThreadAttachment::decrementChildCount()
{
    this->m_childrenCounter--;

    /*
     * The counter should never be less than 0.
     * Otherwise it has not been initialized properly.
     */
    assert(this->childCount() >= 0);

    /*
     * Notify all thread which call joinChildren() which should usually only be its parent thread.
     */
    this->m_childrenConditionVariable.notify_all();
}

int ThreadAttachment::childCount()
{
    return this->m_childrenCounter.load();
}

В качестве тестового примера я вычисляю число Фибоначчи рекурсивно с использованием шаблона Fork/Join. Я подумал, что если уведомление пропущено, оно должно проверить предикат и определить счетчик дочерних элементов равным 0. Очевидно, значение становится равным 0, так как его можно пропустить?

1 ответ

Решение

Обновите переменные, влияющие на условие (в этом случае член count) только в пределах блокировки для мьютекса, соответствующего условию (this->m_childrenConditionVariableMutex).

Смотрите этот ответ для рассуждений.

Другие вопросы по тегам