Почему эта блокировка свободной очереди работает?
Я переписываю старую реализацию очереди без блокировки, я начал с использования memory_order_relaxed для всего с целью ужесточения семантики памяти и добавления автономных заборов и т. Д. Позже. Но, как ни странно, это работает.. Я попытался скомпилировать как XCode, так и VS2015 с максимальными настройками оптимизации. У меня был код, очень похожий на этот, который потерпел неудачу примерно 1-1,5 года назад, когда я в последний раз писал это.
Вот моя очередь:
#ifndef __LOCKFREEMPMCQUEUE_H__
#define __LOCKFREEMPMCQUEUE_H__
#include <atomic>
template <typename T>
class LockFreeMPMCQueue
{
public:
explicit LockFreeMPMCQueue(size_t size)
: m_data(new T[size])
, m_size(size)
, m_head_1(0)
, m_head_2(0)
, m_tail_1(0)
, m_tail_2(0)
{
}
virtual ~LockFreeMPMCQueue() { delete m_data; }
bool try_enqueue(const T& value)
{
size_t tail = m_tail_1.load(std::memory_order_relaxed);
const size_t head = m_head_2.load(std::memory_order_relaxed);
const size_t count = tail - head;
if (count == m_size)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_tail_1, &tail, (tail + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
m_data[tail % m_size] = value;
while (m_tail_2.load(std::memory_order_relaxed) != tail)
{
std::this_thread::yield();
}
m_tail_2.store(tail + 1, std::memory_order_relaxed);
return true;
}
bool try_dequeue(T& out)
{
size_t head = m_head_1.load(std::memory_order_relaxed);
const size_t tail = m_tail_2.load(std::memory_order_relaxed);
if (head == tail)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_head_1, &head, (head + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
out = m_data[head % m_size];
while (m_head_2.load(std::memory_order_relaxed) != head)
{
std::this_thread::yield();
}
m_head_2.store(head + 1, std::memory_order_relaxed);
return true;
}
size_t capacity() const { return m_size; }
private:
T* m_data;
size_t m_size;
std::atomic<size_t> m_head_1;
std::atomic<size_t> m_head_2;
std::atomic<size_t> m_tail_1;
std::atomic<size_t> m_tail_2;
};
#endif
И вот тест, который я написал:
#include <chrono>
#include <thread>
#include <vector>
#include "LockFreeMPMCQueue.h"
std::chrono::microseconds::rep test(LockFreeMPMCQueue<size_t>& queue, char* memory, const size_t num_threads, const size_t num_values)
{
memset(memory, 0, sizeof(char) * num_values);
const size_t num_values_per_thread = num_values / num_threads;
std::thread* reader_threads = new std::thread[num_threads];
std::thread* writer_threads = new std::thread[num_threads];
auto start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i] = std::thread([i, &queue, memory, num_values_per_thread]()
{
for (size_t x = 0; x < num_values_per_thread; ++x)
{
size_t value;
while (!queue.try_dequeue(value))
{
}
memory[value] = 1;
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
writer_threads[i] = std::thread([i, &queue, num_values_per_thread]()
{
const size_t offset = i * num_values_per_thread;
for (size_t x = 0; x < num_values_per_thread; ++x)
{
const size_t value = offset + x;
while (!queue.try_enqueue(value))
{
}
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i].join();
writer_threads[i].join();
}
auto time_taken = std::chrono::high_resolution_clock::now() - start;
delete[] reader_threads;
delete[] writer_threads;
bool fail = false;
for (size_t i = 0; i < num_values; ++i)
{
if (memory[i] == 0)
{
printf("%u = 0\n", i);
fail = true;
}
}
if (fail)
{
printf("FAIL!\n");
}
return std::chrono::duration_cast<std::chrono::milliseconds>(time_taken).count();
}
int main(int argc, char* argv[])
{
const size_t num_threads_max = 16;
const size_t num_values = 1 << 12;
const size_t queue_size = 128;
const size_t num_samples = 128;
LockFreeMPMCQueue<size_t> queue( queue_size );
char* memory = new char[num_values];
const double inv_num_samples = 1.0 / double( num_samples );
for( size_t num_threads = 1; num_threads <= num_threads_max; num_threads *= 2 )
{
double avg_time_taken = 0.0;
for( size_t i = 0; i < num_samples; ++i )
{
avg_time_taken += test( queue, memory, num_threads, num_values ) * inv_num_samples;
}
printf("%u threads, %u ms\n", num_threads, avg_time_taken);
}
delete[] memory;
char c;
scanf("%c", &c);
return 0;
}
Любая помощь высоко ценится!
1 ответ
Порядок памяти определяет только минимальную гарантию, которую вы запрашиваете у сгенерированного кода. Компилятор и аппаратные средства могут давать более сильные гарантии, если пожелают.
В частности, обратите внимание, что на платформах x86 многие обращения к памяти всегда синхронизируются аппаратными средствами (например, нагрузки на x86 всегда последовательны). Вот почему код, который отлично работает на x86, часто ломается при портировании на ARM или PowerPC без более слабой синхронизации по умолчанию на этих платформах.
У Херба Саттера есть отличная таблица сравнения в его лекции по атомному оружию из C++ и после 2012 года (начинается примерно через 31 минуту в видео, или посмотрите слайды под названием " Генерация кода", начиная со страницы 34), где он показывает, как различные упорядочения памяти может или не может привести к разному коду, сгенерированному для разных платформ.
Итог: то, что ваш код теперь работает нормально на вашем компьютере, не означает, что он корректен. Это одна из основных причин, по которой вам не следует возиться с упорядочением памяти, если вы точно не знаете , что делаете (и даже тогда вы, вероятно, все равно не должны этого делать).