Как добиться безблокировочного, но блокирующего поведения?
Я реализую очередь без единого производителя без блокировки для одного интенсивного сетевого приложения. У меня есть куча рабочих потоков, получающих работу в своих отдельных очередях, которые они затем удаляют и обрабатывают.
Удаление блокировок из этих очередей значительно улучшило производительность при высокой нагрузке, но они больше не блокируются, когда очереди пусты, что, в свою очередь, приводит к стремительному увеличению загрузки ЦП.
Как я могу эффективно заставить поток блокироваться, пока он не сможет успешно удалить что-то из очереди или не будет убит / прерван?
6 ответов
Если вы работаете в Linux, посмотрите на использование Futex. Он обеспечивает производительность реализации без блокировки, используя атомарные операции, а не вызовы ядра, как мьютекс, но если вам нужно установить процесс в режим ожидания из-за того, что какое-то условие не является истинным (то есть конфликт блокировки), он будет затем сделайте соответствующие вызовы ядра, чтобы перевести процесс в спящий режим и вернуть его в рабочее состояние в будущем. По сути, это очень быстрый семафор.
В Linux futex может использоваться для блокировки потока. Но имейте в виду, что Futexes являются хитрыми!
ОБНОВЛЕНИЕ: условные переменные гораздо безопаснее использовать, чем фьютексы, и они более переносимы. Однако условная переменная используется в сочетании с мьютексом, поэтому, строго говоря, результат больше не будет свободным от блокировки. Однако, если вашей основной целью является производительность (а не гарантия глобального прогресса), а заблокированная часть (то есть условие для проверки после пробуждения потока) мала, может случиться так, что вы получите удовлетворительные результаты без необходимости углубляться в тонкости интеграции фьютексов в алгоритм.
Если вы работаете в Windows, вы не сможете использовать фьютексы, но в Windows Vista есть похожий механизм, называемый Keyed Events. К сожалению, это не является частью опубликованного API (это собственный API NTDLL), но вы можете использовать его, если вы принимаете предостережение, что оно может измениться в будущих версиях Windows (и вам не нужно работать на до-Vista ядра). Обязательно прочитайте статью, на которую я ссылался выше. Вот непроверенный эскиз того, как это может работать:
/* Interlocked SList queue using keyed event signaling */
struct queue {
SLIST_HEADER slist;
// Note: Multiple queues can (and should) share a keyed event handle
HANDLE keyed_event;
// Initial value: 0
// Prior to blocking, the queue_pop function increments this to 1, then
// rechecks the queue. If it finds an item, it attempts to compxchg back to
// 0; if this fails, then it's racing with a push, and has to block
LONG block_flag;
};
void init_queue(queue *qPtr) {
NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
InitializeSListHead(&qPtr->slist);
qPtr->blocking = 0;
}
void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
InterlockedPushEntrySList(&qPtr->slist, entry);
// Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
// have committed to a keyed-event handshake
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv) {
NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
}
}
SLIST_ENTRY *queue_pop(queue *qPtr) {
SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry)
return entry; // fast path
// Transition block flag 0 -> 1. We must recheck the queue after this point
// in case we race with queue_push; however since ReleaseKeyedEvent
// blocks until it is matched up with a wait, we must perform the wait if
// queue_push sees us
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);
assert(oldv == 0);
entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry) {
// Try to abort
oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv == 1)
return entry; // nobody saw us, we can just exit with the value
}
// Either we don't have an entry, or we are forced to wait because
// queue_push saw our block flag. So do the wait
NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
// block_flag has been reset by queue_push
if (!entry)
entry = InterlockedPopEntrySList(&qPtr->slist);
assert(entry);
return entry;
}
Вы также можете использовать аналогичный протокол, используя Slim Read Write блокировки и переменные условия, с быстрым путем без блокировки. Это обертки для событий с ключами, поэтому они могут потребовать больше ресурсов, чем непосредственное использование событий с ключами.
Вы можете заставить поток спать, используя функцию sigwait(). Вы можете разбудить поток с помощью pthread_kill. Это намного быстрее, чем условные переменные.
Вы пробовали условное ожидание? Когда очередь опустеет, просто начните ждать нового задания. Поток, помещающий задания в очередь, должен запустить сигнал. Таким образом, вы используете блокировки только тогда, когда очередь пуста.
Вы можете добавить сна, пока он ждет. Просто выберите самое большое ожидание, которое вы готовы иметь, затем сделайте что-то вроде этого (псевдокод, потому что я не помню синтаксис pthread):
WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
thing = get_from_queue()
if(thing == null) {
sleep(WAIT_TIME);
} else {
handle(thing);
}
}
Даже что-то короткое, например, 100 мс сна, должно значительно снизить нагрузку на процессор. Я не уверен, в какой момент переключение контекста сделает это хуже, чем занятое ожидание все же.