Неожиданное поведение Boost Threads Producer/Consumer

В настоящее время я пишу приложение (использующее надстройку), которое будет иметь один кадр захвата производителя и один кадр чтения потребителя. Я добавил оператор сна в продюсер, чтобы смоделировать время захвата кадра. Я ожидал, что потребитель будет ждать переменную условия и при первом пробуждении от производителя проснуться, чтобы прочитать фрейм. Однако в файле журнала я вижу, что потребитель (основной поток) ожидает переменную условия, однако производитель проходит через несколько уведомлений, прежде чем потребитель выходит из ожидания, чтобы прочитать кадр.

Вот мой Worker.h

class Worker {
static log4cxx::LoggerPtr m_log;

public:
    Worker();
    virtual ~Worker();

    void start();
    void stop();
    void getCurrentFrame(/*cv::Mat& frame*/);

private:
    void processFrames();

    volatile bool m_stopRequested;

    bool m_bFrameReady;
    boost::mutex m_mutex;
    boost::condition_variable condF;

    boost::shared_ptr<boost::thread> m_thread;
};

Worker.cpp

LoggerPtr Worker::m_log(Logger::getLogger("fdx.Worker"));

Worker::Worker() {
    m_bFrameReady = false;

    LOG4CXX_INFO(m_log, "Worker() c-tor");

    m_stopRequested = false;

}

Worker::~Worker() {
    LOG4CXX_INFO(m_log, "Worker() d-tor");
}

void Worker::start()
{
    LOG4CXX_INFO(m_log, "Worker()::start()");
    assert(!m_thread);

    m_thread = boost::shared_ptr<boost::thread>(new boost::thread(&Worker::processFrames, this));

    LOG4CXX_WARN(m_log, "Worker()::start() thread[" << m_thread->get_id() << "] started!");
}

void Worker::stop()
{
    LOG4CXX_INFO(m_log, "Worker()::stop()");

    if(m_thread != NULL)
    {
        LOG4CXX_INFO(m_log, "Worker()::stop() ThrId [" << m_thread->get_id() << "]");
        m_stopRequested = true;
        m_thread->join();
    }
    else
    {
        LOG4CXX_WARN(m_log, "Worker()::stop() The thread for this camera was never started.");
    }

LOG4CXX_INFO(m_log, "Worker()::stop() thread stopped!");
}

void Worker::processFrames()
{
    LOG4CXX_WARN(m_log, "Worker()::processFrames() Thread[" << boost::this_thread::get_id() << "] starting...");

    int rc = 0;
    std::stringstream ss;

    while(!this->m_stopRequested)
    {
        boost::mutex::scoped_lock lock(m_mutex);
        LOG4CXX_WARN(m_log, "Worker()::processFrames() Got a Write lock");

        m_bFrameReady = true;
        LOG4CXX_WARN(m_log, "Worker()::processFrames() Frame ready set to true");

        boost::this_thread::sleep(boost::posix_time::milliseconds(200));

        LOG4CXX_WARN(m_log, "Worker()::processFrames() Write Un-lock");

        lock.unlock();

        LOG4CXX_WARN(m_log, "Worker()::processFrames() Notify");

        condF.notify_one();
    }
}

void Worker::getCurrentFrame()
{
    boost::mutex::scoped_lock lock(m_mutex);

    while(!this->m_bFrameReady)
    {
        LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() wait for Read lock");
        condF.wait(lock);
    }

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready; Got a Read lock");

    m_bFrameReady = false;

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready set to false");

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Read Un-lock");
    lock.unlock();

}

main.cpp

LoggerPtr logger(Logger::getLogger("TCamApp"));

int main(int argc, char** argv)
{
int rc = 0;

char cwDir[FILENAME_MAX];

Worker* pWorker = NULL;

memset(cwDir, 0, sizeof(cwDir));
getcwd(cwDir, FILENAME_MAX);

std::cout << "Current Working Dir[" << cwDir << "]" << endl;

std::stringstream ss;
ss << "" << cwDir << "/logs.properties";
std::cout << "logs.properties file[" << ss.str() << "]" << endl;

struct stat st;
if(!stat(ss.str().c_str(), &st))
{
    PropertyConfigurator::configure(ss.str());
}
else
{
    BasicConfigurator::configure();
}

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] starting...");

pWorker = new Worker();
assert(pWorker);

pWorker->start();

for(int i = 0; i < 100; i++)
{
    pWorker->getCurrentFrame();

    LOG4CXX_INFO(logger, "Iteration [" << i << "]");


    //boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}

pWorker->stop();

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] stopping...");

return rc;
}

Ниже приведена выдержка из моего файла журнала:

2012-07-11 15:33:53,943 [0x7f5707bcf780] INFO  TCamApp - Application [/home/op/workspace/TestThreads/Debug/TestThreads] starting...
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN  fdx.Worker - Worker()::start() thread[0x15e4c50] started!
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Thread[0x15e4c50] starting...
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:54,346 [0x7f5707bcf780] INFO  TCamApp - Iteration [0]
2012-07-11 15:33:54,346 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,546 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:55,148 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:55,149 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] INFO  TCamApp - Iteration [1]

Как видно из журнала, основной поток ожидает чтения, однако другой поток выдаст несколько уведомлений, прежде чем основной поток выйдет из своего ожидания ().

Я исследовал некоторые из них и подумал, что правильно их кодировал, но он не ведет себя так, как я ожидал. Буду признателен за любые советы по решению. Благодарю.

1 ответ

Решение

Это ожидается, поскольку поток продюсера спит с заблокированным мьютексом. Как только он просыпается, он уведомляет потребителя и снова блокирует его. Нет никакой гарантии "справедливости" в отношении того, кто получит блокировку мьютекса.

Похоже, вы пытаетесь реализовать асинхронную очередь. Обычно он содержит 2 условные переменные: одна для удержания производителей, когда очередь заполнена, другая для удержания потребителей, когда очередь пуста. Независимо от того, сколько времени потребуется для производства или потребления элемента в очереди, мьютекс блокируется только на время операций push / pop - что должно быть очень быстрым.

Ваш оператор сна, вероятно, просто смещает планировщик вашей ОС, чтобы дать больший приоритет потоку производителя. Переместите спящий режим из критической секции, чтобы имитировать обработку вне операции push, и вы должны увидеть, что поток потребителя более отзывчив.

На связанной ноте, вместо того, чтобы опрашивать атомарную переменную для завершения, вы можете поместить объект-страж (то есть специальное значение, например, нулевой указатель на очередь указателей) в очередь, чтобы потоки потребителя знали, что они должны остановить,

Другие вопросы по тегам