Как добиться безблокировочного, но блокирующего поведения?

Я реализую очередь без единого производителя без блокировки для одного интенсивного сетевого приложения. У меня есть куча рабочих потоков, получающих работу в своих отдельных очередях, которые они затем удаляют и обрабатывают.

Удаление блокировок из этих очередей значительно улучшило производительность при высокой нагрузке, но они больше не блокируются, когда очереди пусты, что, в свою очередь, приводит к стремительному увеличению загрузки ЦП.

Как я могу эффективно заставить поток блокироваться, пока он не сможет успешно удалить что-то из очереди или не будет убит / прерван?

6 ответов

Решение

Если вы работаете в Linux, посмотрите на использование Futex. Он обеспечивает производительность реализации без блокировки, используя атомарные операции, а не вызовы ядра, как мьютекс, но если вам нужно установить процесс в режим ожидания из-за того, что какое-то условие не является истинным (то есть конфликт блокировки), он будет затем сделайте соответствующие вызовы ядра, чтобы перевести процесс в спящий режим и вернуть его в рабочее состояние в будущем. По сути, это очень быстрый семафор.

В Linux futex может использоваться для блокировки потока. Но имейте в виду, что Futexes являются хитрыми!

ОБНОВЛЕНИЕ: условные переменные гораздо безопаснее использовать, чем фьютексы, и они более переносимы. Однако условная переменная используется в сочетании с мьютексом, поэтому, строго говоря, результат больше не будет свободным от блокировки. Однако, если вашей основной целью является производительность (а не гарантия глобального прогресса), а заблокированная часть (то есть условие для проверки после пробуждения потока) мала, может случиться так, что вы получите удовлетворительные результаты без необходимости углубляться в тонкости интеграции фьютексов в алгоритм.

Если вы работаете в Windows, вы не сможете использовать фьютексы, но в Windows Vista есть похожий механизм, называемый Keyed Events. К сожалению, это не является частью опубликованного API (это собственный API NTDLL), но вы можете использовать его, если вы принимаете предостережение, что оно может измениться в будущих версиях Windows (и вам не нужно работать на до-Vista ядра). Обязательно прочитайте статью, на которую я ссылался выше. Вот непроверенный эскиз того, как это может работать:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

Вы также можете использовать аналогичный протокол, используя Slim Read Write блокировки и переменные условия, с быстрым путем без блокировки. Это обертки для событий с ключами, поэтому они могут потребовать больше ресурсов, чем непосредственное использование событий с ключами.

Вы можете заставить поток спать, используя функцию sigwait(). Вы можете разбудить поток с помощью pthread_kill. Это намного быстрее, чем условные переменные.

Вы пробовали условное ожидание? Когда очередь опустеет, просто начните ждать нового задания. Поток, помещающий задания в очередь, должен запустить сигнал. Таким образом, вы используете блокировки только тогда, когда очередь пуста.

https://computing.llnl.gov/tutorials/pthreads/

Вы можете добавить сна, пока он ждет. Просто выберите самое большое ожидание, которое вы готовы иметь, затем сделайте что-то вроде этого (псевдокод, потому что я не помню синтаксис pthread):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

Даже что-то короткое, например, 100 мс сна, должно значительно снизить нагрузку на процессор. Я не уверен, в какой момент переключение контекста сделает это хуже, чем занятое ожидание все же.

Другие вопросы по тегам