Заблокируйте свободный класс атомарного состояния - это правильно?

Я просто ищу обратную связь (очевидные недостатки / способы ее улучшения) о моей попытке реализовать атомарное чтение / запись в структуре.

Там будет один поток записи и несколько потоков чтения. Цель состоит в том, чтобы не дать читателю получить непоследовательное представление о структуре, не слишком мешая писателю.

Я использую атомарный примитив fetch-and-add, в данном случае предоставленный платформой Qt.

Например:

/* global */
OneWriterAtomicState<Point> atomicState;

/* Writer */
while(true) {  
  MyStruct s = atomicState.getState()
  s.x += 2; s.y += 2;
  atomicState.setState(s);
}

/* Reader */
while(true) {  
  MyStruct s = atomicState.getState()
  drawBox(s.x,s.y);
}

Реализация OneWriterAtomicState:

template <class T>
class OneWriterAtomicState
{
public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
    }

    void setState(T& state) {
        this->seqNumber.fetchAndAddOrdered(1);
        this->state = state;
        this->seqNumber.fetchAndAddOrdered(1);
    }

    T getState(){
        T result;
        int seq;
        bool seq_changed = true;

        /* repeat while seq is ODD or if seq changes during read operation */
        while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
            result = this->state;
            seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
        }
        return result;
    }


private:
    QAtomicInt seqNumber;
    T state;
} 

Вот вторая версия (memcpy, читатель уступает, надеюсь, исправил getState()):

template <class T>
class OneWriterAtomicState
{

public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
        /* Force a compile-time error if T is NOT a type we can copy with memcpy */
        Q_STATIC_ASSERT(!QTypeInfo<T>::isStatic);
    }

    void setState(T* state) {
        this->seqNumber.fetchAndAddOrdered(1);
        memcpy(&this->state,state,sizeof(T));
        this->seqNumber.fetchAndAddOrdered(1);
    }

    void getState(T* result){
        int seq_before;
        int seq_after  = this->seqNumber.fetchAndAddOrdered(0);
        bool seq_changed = true;
        bool firstIteration = true;

        /* repeat while seq_before is ODD or if seq changes during read operation */
        while( ((seq_before=seq_after) & 0x01) || seq_changed ) {

            /* Dont want to yield on first attempt */
            if(!firstIteration) {
                /* Give the writer a chance to finish */
                QThread::yieldCurrentThread();
            } else firstIteration = false;

            memcpy(result,&this->state,sizeof(T));
            seq_after = this->seqNumber.fetchAndAddOrdered(0);
            seq_changed = (seq_before!=seq_after);
        }
    }

    bool isInitialized() {  return (seqNumber>0); }

private:
    QAtomicInt seqNumber;
    T state;
} ;

#endif // ONEWRITERATOMICSTATE_H

3 ответа

Решение

Алгоритм не совсем правильный. Вот одно из возможных чередований потоков, когда читатель получает противоречивые данные:

state initialized to {0,0} and seqNumber to 0

Writer:
seqNumber = 1;
state.x = 1;

Reader:
seq = seqNumber; //1
result = state; //{1,0}
seq_changed = (seqNumber != seq); //false

Writer:
state.y = 1;
seqNumber = 2;

Reader:
jumps back to the start of the loop
seq = seqNumber; //2
steps out of the loop because seq == 2 and seq_changed == false

Так что проблема в том, что seqNumber читается в двух местах, и писатель может обновлять значение между чтениями.

while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
    result = this->state;
    seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
    //If writer updates seqNumber here to even number bad things may happen
}

Это должно быть прочитано только один раз за итерацию:

T getState(){
    T result;
    int seq;
    int newseq = seqNumber.fetchAndAddOrdered(0);
    bool seq_changed = true;

    while( (seq = newseq) & 0x01 || seq_changed ) {
        result = state;
        newseq = seqNumber.fetchAndAddOrdered(0);
        seq_changed = (newseq != seq);
    }
    return result;
}

Я считаю, что это должно работать правильно, но я не буду ничего гарантировать.:) По крайней мере, вы должны написать тестовую программу, подобную той, что в вашем примере, но добавить проверку на несовместимые значения в считывателе.

Стоит учитывать, что использование атомарного приращения (fetchAndAdd) является своего рода излишним. Там только одна тема пишет seqNumberТаким образом, вы можете выполнять простые атомарные операции хранения и загрузки, и они могут быть реализованы намного быстрее на многих процессорах. Однако я не знаю, возможны ли эти операции с QAtomicInt; документация очень неясна по этому поводу.

edit: и wilx прав, T должен быть тривиально копируемым типом

Я думаю, что это будет работать только в том случае, если TОператор присваивания копии является примитивным и делает в основном только побитовое копирование. Для более сложных T Вы можете получить несогласованное состояние во время выполнения result = this->state;,

Итак, я бы предложил использовать какие-то рулки с предпочтением писателя.

Если у вас есть планирование потоков на основе приоритетов и читатель имеет более высокий приоритет, чем писатель, вы можете столкнуться с livelock. Представьте, что писатель начинает записывать значение, а затем читатель начинает активно ждать. Из-за более высокого приоритета читателя у писателя никогда не будет шанса закончить письмо.

Решением было бы добавить крошечную задержку в цикл ожидания.

Другие вопросы по тегам