C++ pthread блокировка очереди тупик (я думаю)

У меня проблема с pthreads, где я думаю, что захожу в тупик. Я создал блокирующую очередь, которая, как мне показалось, работала, но после некоторого дополнительного тестирования я обнаружил, что если я пытаюсь отменить несколько потоков, которые блокируют блокирующую очередь, я, похоже, захожу в тупик.

Очередь блокировки очень проста и выглядит так:

template <class T> class Blocking_Queue
{
public:
    Blocking_Queue()
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }

    ~Blocking_Queue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

    void put(T t)
    {
        pthread_mutex_lock(&_lock);
        _queue.push(t);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_lock);
    }

     T pull()
     {
        pthread_mutex_lock(&_lock);
        while(_queue.empty())
        {
            pthread_cond_wait(&_cond, &_lock);
        }

        T t = _queue.front();
        _queue.pop();

        pthread_mutex_unlock(&_lock);

        return t;
     }

priavte:
    std::queue<T> _queue;
    pthread_cond_t _cond;
    pthread_mutex_t _lock;
}

Для тестирования я создал 4 потока, которые тянут эту очередь блокировки. Я добавил несколько операторов печати в очередь блокировки, и каждый поток получает метод pthread_cond_wait(). Однако, когда я пытаюсь вызвать pthread_cancel() и pthread_join() в каждом потоке, программа просто зависает.

Я также проверил это только с одной нитью, и она отлично работает

Согласно документации, pthread_cond_wait() является точкой отмены, поэтому вызов метода cancel для этих потоков должен привести к остановке их выполнения (и это работает только с 1 потоком). Однако pthread_mutex_lock не является точкой отмены. Может ли что-то происходить по аналогии с вызовом pthread_cancel(), отмененный поток извлекает мьютекс перед завершением и не разблокирует его, а затем, когда следующий поток отменяется, он не может получить мьютекс и взаимоблокировки? Или есть что-то еще, что я делаю не так.

Любой совет был бы прекрасен. Спасибо:)

3 ответа

Решение

pthread_cancel() лучше всего избегать.

Вы можете разблокировать все ваши потоки, заблокированные в Blocking_Queue::pull(), сгенерировав исключение оттуда.

Одним из слабых мест в очереди является то, что T t = _queue.front(); вызывает конструктор копирования T, который может вызвать исключение, делая мьютекс очереди заблокированным навсегда. Лучше использовать блокировку C++.

Вот пример изящного завершения потока:

$ cat test.cc
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>

struct BlockingQueueTerminate
    : std::exception
{};

template<class T>
class BlockingQueue
{
private:
    boost::mutex mtx_;
    boost::condition_variable cnd_;
    std::list<T> q_;
    unsigned blocked_;
    bool stop_;

public:
    BlockingQueue()
        : blocked_()
        , stop_()
    {}

    ~BlockingQueue()
    {
        this->stop(true);
    }

    void stop(bool wait)
    {
        // tell threads blocked on BlockingQueue::pull() to leave
        boost::mutex::scoped_lock lock(mtx_);
        stop_ = true;
        cnd_.notify_all();

        if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
            while(blocked_)
                cnd_.wait(lock);
    }

    void put(T t)
    {
        boost::mutex::scoped_lock lock(mtx_);
        q_.push_back(t);
        cnd_.notify_one();
    }

    T pull()
    {
        boost::mutex::scoped_lock lock(mtx_);

        ++blocked_;
        while(!stop_ && q_.empty())
            cnd_.wait(lock);
        --blocked_;

        if(stop_) {
            cnd_.notify_all(); // tell stop() this thread has left
            throw BlockingQueueTerminate();
        }

        T front = q_.front();
        q_.pop_front();
        return front;
    }
};

void sleep_ms(unsigned ms)
{
    // i am using old boost
    boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms));
    // with latest one you can do this
    //boost::thread::sleep(boost::posix_time::milliseconds(10));
}

void thread(int n, BlockingQueue<int>* q)
try
{
    for(;;) {
        int m = q->pull();
        printf("thread %u: pulled %d\n", n, m);
        sleep_ms(10);
    }
}
catch(BlockingQueueTerminate&)
{
    printf("thread %u: finished\n", n);
}

int main()
{
    BlockingQueue<int> q;

    // create two threads
    boost::thread_group tg;
    tg.create_thread(boost::bind(thread, 1, &q));
    tg.create_thread(boost::bind(thread, 2, &q));
    for(int i = 1; i < 10; ++i)
        q.put(i);
    sleep_ms(100); // let the threads do something
    q.stop(false); // tell the threads to stop
    tg.join_all(); // wait till they stop
}

$ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc

$ ./test
thread 2: pulled 1
thread 1: pulled 2
thread 1: pulled 3
thread 2: pulled 4
thread 1: pulled 5
thread 2: pulled 6
thread 1: pulled 7
thread 2: pulled 8
thread 1: pulled 9
thread 2: finished
thread 1: finished

У меня был похожий опыт работы с pthread_cond_wait() / pthread_cancel(). У меня были проблемы с блокировкой, которая все еще удерживалась после того, как поток вернулся по какой-то причине, и было невозможно разблокировать ее, так как вы должны разблокировать в том же потоке, который вы заблокировали. Я заметил эти ошибки при выполнении pthread_mutex_destroy(), так как у меня был один производитель, ситуация с одним потребителем, поэтому тупика не возникало.

Предполагается, что pthread_cond_wait () блокирует мьютекс при возврате, и это могло произойти, но окончательная разблокировка не прошла, так как мы принудительно отменили поток. В целях безопасности я обычно стараюсь вообще не использовать pthread_cancel (), так как некоторые платформы даже не поддерживают это. Вы можете использовать летучий тип bool или атомарный и проверить, должна ли быть закрыта нить. Таким образом, мьютексы также будут обрабатываться чисто.

Я не совсем знаком с pthread_cancel() - я предпочитаю совместное завершение.

Разве pthread_cancel () не оставит ваш мьютекс заблокированным? Я полагаю, вам нужно очистить с помощью обработчика отмены.

Другие вопросы по тегам