disruptor: получение слотов перед лицом целочисленного переполнения

Я реализую шаблон разрушителя для межпотокового взаимодействия в C++ для нескольких производителей. В реализации от LMAX next(n) метод в MultiProducerSequencer.java использует целые числа со знаком (хорошо, это Java), но порт C++ (disruptor--) использует целые числа со знаком. После очень (очень) длительного времени переполнение приведет к неопределенному поведению.

Целые числа без знака имеют несколько преимуществ:

  • правильное поведение при переполнении
  • нет необходимости в 64-битных целых

Вот мой подход к заявке на n слотов (источник прилагается в конце): next_ индекс следующего свободного слота, который может быть востребован, tail_ последний свободный слот, который может быть востребован (будет обновлен где-то еще). n меньше размера буфера. Мой подход заключается в нормализации следующего и хвостового положения для промежуточных вычислений путем вычитания хвоста из следующего. Добавление n нормализовать следующий norm должен быть меньше размера буфера, чтобы успешно запросить интервалы между next_ а также next_+n, Предполагается, что norm + n не переполнится.

1) это правильно или делает next_ пройти tail_ в некоторых случаях? Это работает с меньшими целочисленными типами как uint32_t или же uint16_t если размер буфера и n ограничены, например, до 1/10 * максимальное целое число этих типов.
2) Если это не правильно, то я хотел бы знать конкретный случай.
3) Что-то еще не так или что можно улучшить? (Я опустил кешлайн)

class msg_ctrl
{
public:
    inline msg_ctrl();

    inline int claim(size_t n, uint64_t& seq);
    inline int publish(size_t n, uint64_t seq);
    inline int tail(uint64_t t);

public:
    std::atomic<uint64_t> next_;
    std::atomic<uint64_t> head_;
    std::atomic<uint64_t> tail_;
};

// Implementation -----------------------------------------

msg_ctrl::msg_ctrl() : next_(2), head_(1), tail_(0)
{}

int msg_ctrl::claim(size_t n, uint64_t& seq)
{
    uint64_t const size = msg_buffer::size();
    if (n > 1024) // please do not try to reserve too much slots
        return -1;

    uint64_t curr = 0;
    do
    {
        curr = next_.load();

        uint64_t tail = tail_.load();
        uint64_t norm = curr - tail;
        uint64_t next = norm + n;

        if (next > size)
            std::this_thread::yield(); // todo: some wait strategy
        else if (next_.compare_exchange_weak(curr, curr + n))
            break;

    } while (true);

    seq = curr;
    return 0;
}

int msg_ctrl::publish(size_t n, uint64_t seq)
{
    uint64_t tmp = seq-1;
    uint64_t val = seq+n-1;
    while (!head_.compare_exchange_weak(tmp, val))
    {
        tmp = seq-1;
        std::this_thread::yield();
    }
    return 0;
}

int msg_ctrl::tail(uint64_t t)
{
    tail_.store(t);
    return 0;
}

Публикация в кольцевом буфере будет выглядеть так:

size_t n = 15;
uint64_t seq = 0;
msg_ctrl->claim(n, seq);

//fill items in buffer
buffer[seq + 0] = an item
buffer[seq + 1] = an item
...
buffer[seq + n-1] = an item

msg_ctrl->publish(n, seq);

0 ответов

Другие вопросы по тегам