Реализация boost:: барьера в C++11

Я пытался избавить проект от каждой ссылки на повышение и перейти на чистый C++11.

В какой-то момент создаются рабочие потоки, которые ждут, пока барьер подаст команду 'go', выполнят работу (распределенную по N потокам) и синхронизируются, когда все они завершатся. Основная идея заключается в том, что основной цикл задает порядок перехода (boost:: барьер.wait()) и ожидает результата с той же функцией.

Я реализовал в другом проекте специальный барьер, основанный на версии Boost, и все работало отлично. Реализация заключается в следующем:

Barrier.h:

class Barrier {
public:
    Barrier(unsigned int n);
    void Wait(void);
private:
    std::mutex counterMutex;
    std::mutex waitMutex;

    unsigned int expectedN;
    unsigned int currentN;
};

Barrier.cpp

Barrier::Barrier(unsigned int n) {
    expectedN = n;
    currentN = expectedN;
}

void Barrier::Wait(void) {
    counterMutex.lock();

    // If we're the first thread, we want an extra lock at our disposal

    if (currentN == expectedN) {
        waitMutex.lock();
    }

    // Decrease thread counter

    --currentN;

    if (currentN == 0) {
        currentN = expectedN;
        waitMutex.unlock();

        currentN = expectedN;
        counterMutex.unlock();
    } else {
        counterMutex.unlock();

        waitMutex.lock();
        waitMutex.unlock();
    }
}

Этот код использовался в iOS и Android NDK без каких-либо проблем, но при попытке его выполнить в проекте Visual Studio 2013 кажется, что разблокировать его может только поток, заблокировавший мьютекс (утверждение: разблокировка неизвестного мьютекса).

Есть ли какая-нибудь невращающаяся (блокирующая, такая как эта) версия барьера, которую я могу использовать, которая работает для C++11? Я только смог найти барьеры, которые использовали ожидание "занято", что я хотел бы предотвратить (если на самом деле нет причин для этого).

4 ответа

Решение

Используйте std::condition_variable вместо std::mutex, чтобы заблокировать все потоки, пока последний не достигнет барьера.

class Barrier
{
private:
    std::mutex _mutex;
    std::condition_variable _cv;
    std::size_t _count;
public:
    explicit Barrier(std::size_t count) : _count(count) { }
    void Wait()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if (--_count == 0) {
            _cv.notify_all();
        } else {
            _cv.wait(lock, [this] { return _count == 0; });
        }
    }
};
class Barrier {
public:
    explicit Barrier(std::size_t iCount) : 
      mThreshold(iCount), 
      mCount(iCount), 
      mGeneration(0) {
    }

    void Wait() {
        std::unique_lock<std::mutex> lLock{mMutex};
        auto lGen = mGeneration;
        if (!--mCount) {
            mGeneration++;
            mCount = mThreshold;
            mCond.notify_all();
        } else {
            mCond.wait(lLock, [this, lGen] { return lGen != mGeneration; });
        }
    }

private:
    std::mutex mMutex;
    std::condition_variable mCond;
    std::size_t mThreshold;
    std::size_t mCount;
    std::size_t mGeneration;
};

Вот моя версия принятого ответа выше с автоматическим сбросом поведения для повторного использования; это было достигнуто путем подсчета вверх и вниз поочередно.

    /**
    * @brief Represents a CPU thread barrier
    * @note The barrier automatically resets after all threads are synced
    */
    class Barrier
    {
    private:
        std::mutex m_mutex;
        std::condition_variable m_cv;

        size_t m_count;
        const size_t m_initial;

        enum State : unsigned char {
            Up, Down
        };
        State m_state;

    public:
        explicit Barrier(std::size_t count) : m_count{ count }, m_initial{ count }, m_state{ State::Down } { }

        /// Blocks until all N threads reach here
        void Sync()
        {
            std::unique_lock<std::mutex> lock{ m_mutex };

            if (m_state == State::Down)
            {
                // Counting down the number of syncing threads
                if (--m_count == 0) {
                    m_state = State::Up;
                    m_cv.notify_all();
                }
                else {
                    m_cv.wait(lock, [this] { return m_state == State::Up; });
                }
            }

            else // (m_state == State::Up)
            {
                // Counting back up for Auto reset
                if (++m_count == m_initial) {
                    m_state = State::Down;
                    m_cv.notify_all();
                }
                else {
                    m_cv.wait(lock, [this] { return m_state == State::Down; });
                }
            }
        }
    };  

Кажется, что все приведенные выше ответы не работают, если барьер расположен слишком близко

Пример: каждый поток, выполняющий цикл while, выглядит следующим образом :

      while (true)
{
    threadBarrier->Synch();
    // do heavy computation
    threadBarrier->Synch();
    // small external calculations like timing, loop count, etc, ...
}

А вот решение с использованием STL:

      class ThreadBarrier
{
public:
    int m_threadCount = 0;
    int m_currentThreadCount = 0;

    std::mutex m_mutex;
    std::condition_variable m_cv;

public:
    inline ThreadBarrier(int threadCount)
    {
        m_threadCount = threadCount;
    };

public:
    inline void Synch()
    {
        bool wait = false;

        m_mutex.lock();

        m_currentThreadCount = (m_currentThreadCount + 1) % m_threadCount;

        wait = (m_currentThreadCount != 0);

        m_mutex.unlock();

        if (wait)
        {
            std::unique_lock<std::mutex> lk(m_mutex);
            m_cv.wait(lk);
        }
        else
        {
            m_cv.notify_all();
        }
    };

};

И решение для Windows:

      class ThreadBarrier
{
public:
    SYNCHRONIZATION_BARRIER m_barrier;

public:
    inline ThreadBarrier(int threadCount)
    {
        InitializeSynchronizationBarrier(
            &m_barrier,
            threadCount,
            8000);
    };

public:
    inline void Synch()
    {
        EnterSynchronizationBarrier(
            &m_barrier,
            0);
    };

};
Другие вопросы по тегам