Я реализую очередь без блокировки, описанную Энтони Уильямсом в "Параллелизме C++ в действии". Я тестирую это как новый контейнер libcds. Поп и пуш тесты работают нормально. Но тестирование нескольких производителей, тестирование нескольких потребителей иногда не удается. VLD (или Intel Inspector XE, или ASan) показывает утечку памяти. Я исправляю это, добавляя деструктор узла, но проблема с наличием всех элементов все еще остается. Как я могу решить эту проблему? Благодарю.

Уильямс без блокировки очереди:

#include <memory>
template <class T>
class williams_queue
    counted_node_ptr counted_node;
    counted_node.ptr = new node;
    counted_node.external_count = 1;


  williams_queue(const lock_free_queue_mpmc& other) = delete;
  williams_queue& operator=(const lock_free_queue_mpmc& other) = delete;

    counted_node_ptr old_head = head_.load();
    while (node* const old_node = old_head.ptr)
      delete old_node;
      old_head = head_.load();

  void push(const T& new_value)
    std::unique_ptr<T> new_data(new T(new_value));

    counted_node_ptr new_next;
    new_next.ptr = new node;
    new_next.external_count = 1;
    counted_node_ptr old_tail = tail_.load();

    while (true)
      increase_external_count(tail_, old_tail);
      T* old_data = nullptr;
      if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
        counted_node_ptr old_next = {0};
        if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
          delete new_next.ptr;
          new_next = old_next;
        set_new_tail(old_tail, new_next);
        counted_node_ptr old_next = {0};
        if(old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
          old_next = new_next;
          new_next.ptr = new node;
        set_new_tail(old_tail, old_next);

  bool pop(Func f)
    counted_node_ptr old_head = head_.load(std::memory_order_relaxed);
    while (true)
      increase_external_count(head_, old_head);
      node* const ptr = old_head.ptr;
      if(ptr == tail_.load().ptr)
        release_ref( p );
        return false;
      counted_node_ptr next = ptr->next.load();
      if (head_.compare_exchange_strong(old_head,next))
        T* const res = ptr->data.exchange(nullptr);
        return true;

  struct node;

  struct counted_node_ptr
    int external_count;
    node* ptr;

  struct node_counter
    unsigned internal_count    : 30;
    unsigned external_counters : 2;

  struct node
    std::atomic<T*> data;
    std::atomic<node_counter> count;
    std::atomic<counted_node_ptr> next;

      node_counter new_count;
      new_count.internal_count    = 0;
      new_count.external_counters = 2;

      counted_node_ptr new_next;
      new_next.ptr            = nullptr;
      new_next.external_count = 0;


  static void release_ref(node * p)
      node_counter old_counter = p->count.load(std::memory_order_relaxed);
      node_counter new_counter;

      while(!p->count.compare_exchange_strong(old_counter, new_counter,

      if(!new_counter.internal_count && !new_counter.external_counters)
        delete p;

  void set_new_tail(counted_node_ptr& old_tail,
                    const counted_node_ptr& new_tail)
    node* const current_tail_ptr = old_tail.ptr;

    while (!tail_.compare_exchange_weak(old_tail, new_tail) &&
       old_tail.ptr == current_tail_ptr);

    if(old_tail.ptr == current_tail_ptr)

  static void increase_external_count(std::atomic<counted_node_ptr>&     counter,
                                      counted_node_ptr& old_counter)
    counted_node_ptr new_counter;

      new_counter = old_counter;
    while(!counter.compare_exchange_strong(old_counter, new_counter,

    old_counter.external_count = new_counter.external_count;

  static void free_external_counter(counted_node_ptr& old_node_ptr)
    node* const ptr = old_node_ptr.ptr;
    const int count_increase = old_node_ptr.external_count - 2;
    node_counter old_counter= ptr->count.load(std::memory_order_relaxed);
    node_counter new_counter;

      new_counter = old_counter;
      new_counter.internal_count += count_increase;
    while(!ptr->count.compare_exchange_strong(old_counter, new_counter,

    if(!new_counter.internal_count && !new_counter.external_counters)
      delete ptr;


  std::atomic<counted_node_ptr> head_;
  std::atomic<counted_node_ptr> tail_;


Результат испытаний:

Визуальный детектор утечки считывает настройки из: D:\Development\COMMON_UTILS\ Визуальный детектор утечки \vld.ini Установлен визуальный детектор утечки версии 2.5. libcds version 2.1.0 Тест запущен 2017-Jan-31 01:19:03 Использование тестового конфигурационного файла: test-debug.conf Топология системы: число логических процессоров: 4

   reader count=3 writer count=3 item count=99999...
        Item count: 0
        Item count: 0
        Item count: 0
        Post pops: 0
        Reader 0 popped count=35822
        Reader 1 popped count=32755
        Reader 2 popped count=31420
        Readers: duration=0.893811, success pop=99997, failed pops=261140
        Writers: duration=0.841302, failed push=0

d: \ development \ libcds \ tests \ unit \ queue \ queue_reader_writer.cpp (253): CPKUNIT_CH ECK(nTotalPops + nPostTestPops == nQueueSize: popped=99997 должно быть 99999); Проверить последовательность вытесненной последовательности...

ВНИМАНИЕ: Визуальный детектор утечек обнаружил утечки памяти! ---------- Блок 116955 в 0x00DB33D0: 8 байтов ---------- Хэш утечки: 0xD835B211, Количество: 1, Всего 8 байтов, стек вызовов (TID 2836): ucrtbased.dll!malloc() f:\dd\vctools\crt\vcstartup\src\heap\new_scalar.cpp (19): unit-queue_d.exe!o perator new() + 0x9 байт d:\development\libcds\cds\container\williams_queue.h (297): unit-queue_d.exe!cds::container::WilliamsQueue::push() d:\development\libcds\tests\unit\queue\queue_reader_writer.cpp (85): unit-qu eue_d.exe!queue::Queue_ReaderWriter::WriterThread >::t est() + 0xF байт

Затем я исправляю утечки памяти, добавляя деструктор узла с удалением данных. Но неудачные испытания все еще остаются.

Код тестового прогона

namespace {
    static size_t s_nReaderThreadCount = 4;
    static size_t s_nWriterThreadCount = 4;
    static size_t s_nQueueSize = 100000; // by default 4000000;

    struct Value {
        size_t      nNo;
        size_t      nWriterNo;

class Queue_ReaderWriter: public CppUnitMini::TestCase
    template <class Queue>
    class WriterThread: public CppUnitMini::TestThread
        Queue&              m_Queue;
        double              m_fTime;
        size_t              m_nPushFailed;

        virtual void test()
            size_t nPushCount = getTest().m_nThreadPushCount;
            Value v;
            v.nWriterNo = m_nThreadNo;
            v.nNo = 0;
            m_nPushFailed = 0;

            m_fTime = m_Timer.duration();

            while ( v.nNo < nPushCount ) {
                if (m_Queue.push(v)) {  

            m_fTime = m_Timer.duration() - m_fTime;
            getTest().m_nWriterDone.fetch_add( 1 );

    template <class Queue>
    class ReaderThread: public CppUnitMini::TestThread
        Queue&              m_Queue;
        double              m_fTime;
        size_t              m_nPopEmpty;
        size_t              m_nPopped;
        size_t              m_nBadWriter;

        typedef std::vector<size_t> TPoppedData;
        std::vector<TPoppedData>        m_WriterData;

        virtual void test()
            m_nPopEmpty = 0;
            m_nPopped = 0;
            m_nBadWriter = 0;
            const size_t nTotalWriters = s_nWriterThreadCount;
            Value v;

            m_fTime = m_Timer.duration();

            while ( true ) {
                if ( m_Queue.pop( v ) ) {
                    if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
                        m_WriterData[ v.nWriterNo ].push_back( v.nNo );

                if ( m_Queue.empty() ) {
                    if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
                        CPPUNIT_MSG("    Item count: " << m_Queue.size());
                        if ( m_Queue.empty() )

            m_fTime = m_Timer.duration() - m_fTime;

    size_t                  m_nThreadPushCount;
    atomics::atomic<size_t>     m_nWriterDone;

    template <class Queue>
    void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0  )
        typedef ReaderThread<Queue> Reader;
        typedef WriterThread<Queue> Writer;

        size_t nPostTestPops = 0;
            Value v;
            while ( testQueue.pop( v ))
        CPPUNIT_MSG("    Post pops: " << nPostTestPops);

        double fTimeWriter = 0;
        double fTimeReader = 0;
        size_t nTotalPops = 0;
        size_t nPopFalse = 0;
        size_t nPoppedItems = 0;
        size_t nPushFailed = 0;

        std::vector< Reader * > arrReaders;

        for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
            Reader * pReader = dynamic_cast<Reader *>( *it );
            if ( pReader ) {
                fTimeReader += pReader->m_fTime;
                nTotalPops += pReader->m_nPopped;
                nPopFalse += pReader->m_nPopEmpty;
                arrReaders.push_back( pReader );
                CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );

                size_t nPopped = 0;
                for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
                    nPopped += pReader->m_WriterData[n].size();

                CPPUNIT_MSG( "    Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
                nPoppedItems += nPopped;
            else {
                Writer * pWriter = dynamic_cast<Writer *>( *it );
                CPPUNIT_ASSERT( pWriter != nullptr );
                fTimeWriter += pWriter->m_fTime;
                nPushFailed += pWriter->m_nPushFailed;
                if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
                    CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
                        "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
        CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );

        CPPUNIT_MSG( "    Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
        CPPUNIT_MSG( "    Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );

        size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
        CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
        CPPUNIT_CHECK( testQueue.empty() );

    template <class Queue>
    void test()
        m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
        CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
            << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );

        Queue testQueue;
        CppUnitMini::ThreadPool pool( *this );

        m_nWriterDone.store( 0 );

        // Writers must be first
        pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
        pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );


        analyze( pool, testQueue );
        CPPUNIT_MSG( testQueue.statistics() );

Трассировка стека от VLD говорит вам, где память была выделена, но не освобождена: WilliamsQueue::push, строка 297 в вашем заголовке.

Где эта выделенная память иногда просачивается, вероятно, в old_next = new_next линия. Вы копируете существующий counted_node_ptr поверх пустой, выделите новую память, тогда нет очевидного места для ранее выделенной памяти, которая будет удалена.

