Как смоделировать очередь в промеле?

Итак, я пытаюсь смоделировать замок CLH-RW в Promela.

Способ блокировки прост: действительно:

Очередь состоит из tail, к которому и читатели и писатели ставят в очередь узел, содержащий единственный bool succ_must_wait они делают это, создавая новый узел и обрабатывая его с помощью tail,

Хвост тем самым становится предшественником узла, pred,

Потом они крутятся pred.succ_must_wait пока это не false,

Читатели сначала увеличивают счетчик читателей ncritR а затем установить свой собственный флаг false, позволяя нескольким читателям одновременно находиться в критическом разделе. Освобождение блокировки чтения просто означает уменьшение ncritR снова.

Писатели ждут пока ncritR достигает нуля, затем войдите в критическую секцию. Они не устанавливают свой флаг false пока замок не будет снят.

Хотя я пытаюсь смоделировать это в промеле.

Моя текущая попытка (см. Ниже) пытается использовать массивы, где каждый узел в основном состоит из нескольких элементов массива.

Это терпит неудачу, потому что скажем A ставит себя в очередь, тогда B ставит себя в очередь. Тогда очередь будет выглядеть так:

S <- A <- B

куда S является сторожевым узлом

Проблема в том, что когда A работает до полноты и повторно ставит в очередь, очередь будет выглядеть

S <- A <- B <- A'

В реальном исполнении это абсолютно нормально, потому что A а также A' являются различными объектами узла. И с тех пор A.succ_must_wait будет установлен в false когда A сначала отпустил замок, B будет в конечном итоге добиться прогресса, и, следовательно, A' будет в конечном итоге добиться прогресса.

Что происходит в приведенной ниже модели на основе массива, это то, что A а также A' занимают одинаковые позиции массива, вызывая B упустить тот факт, что A снял блокировку, создав тем самым тупик, в котором B (неправильно) ждет A' вместо A а также A' ждет (правильно) B,

Возможное "решение" этого могло бы иметь A Подожди пока B подтверждает выпуск. Но это было бы не так, как работает замок.

Другим "решением" будет ожидание ИЗМЕНЕНИЯ в pred.succ_must_waitгде выпуск будет увеличиваться succ_must_waitвместо сброса 0,

Но я собираюсь смоделировать версию замка, где pred может измениться (т. е. когда узлу может быть разрешено игнорировать некоторых из его предшественников), и я не совсем уверен, что что-то вроде увеличения версии не вызовет проблемы с этим изменением.

Так какой же самый "умный" способ моделировать неявную очередь, подобную этой, в promela?

/* CLH-RW Lock */
/*pid: 0 = init, 1-2 = reader, 3-4 =  writer*/

ltl liveness{ 
    ([]<> reader[1]@progress_reader)
    && ([]<> reader[2]@progress_reader)
    && ([]<> writer[3]@progress_writer)
    && ([]<> writer[4]@progress_writer)
 }

bool initialised = 0;

byte ncritR;
byte ncritW;
byte tail;
bool succ_must_wait[5]
byte pred[5]

init{
    assert(_pid == 0);
    ncritR = 0;
    ncritW = 0;

    /*sentinel node*/
    tail =0;
    pred[0] = 0;
    succ_must_wait[0] = 0;
    initialised = 1;
}

active [2] proctype reader()
{
    assert(_pid >= 1);
    (initialised == 1)
    do
    :: else ->
        succ_must_wait[_pid] = 1;
        atomic {
            pred[_pid] = tail;
            tail = _pid;
        }

        (succ_must_wait[pred[_pid]] == 0)

        ncritR++;
        succ_must_wait[_pid] = 0;
        atomic {
            /*freeing previous node for garbage collection*/
            pred[_pid] = 0;
        }

        /*CRITICAL SECTION*/
progress_reader:
        assert(ncritR >= 1);
        assert(ncritW == 0);

        ncritR--;

        atomic {
            /*necessary to model the fact that the next access creates a new queue node*/
            if
            :: tail == _pid -> tail = 0;
            :: else ->
            fi
        }
    od
}

active [2] proctype writer()
{
    assert(_pid >= 1);
    (initialised == 1)
    do
    :: else -> 
        succ_must_wait[_pid] = 1;

        atomic {
            pred[_pid] = tail;
            tail = _pid;
        }

        (succ_must_wait[pred[_pid]] == 0)
        (ncritR == 0)
        atomic {
            /*freeing previous node for garbage collection*/
            pred[_pid] = 0;
        }
        ncritW++;

        /* CRITICAL SECTION */
progress_writer:
        assert(ncritR == 0);
        assert(ncritW == 1);

        ncritW--;
        succ_must_wait[_pid] = 0;

        atomic {
            /*necessary to model the fact that the next access creates a new queue node*/
            if
            :: tail == _pid -> tail = 0;
            :: else -> 
            fi
        }
    od
}

1 ответ

Прежде всего несколько замечаний:

  • Вам не нужно инициализировать ваши переменные 0, поскольку:

    Начальное значение по умолчанию для всех переменных - ноль.

    смотри документы.

  • Вам не нужно заключать одну инструкцию в atomic {} утверждение, поскольку любое элементарное утверждение выполняется атомарно. Для повышения эффективности процесса проверки, по возможности, вы должны использовать d_step {} вместо. Здесь вы можете найти связанные с этим вопросы о стека и потоке по теме.

  • init {} гарантированно иметь _pid == 0 когда выполняется одно из двух следующих условий:

    • нет active proctype объявлен
    • init {} объявлен раньше любого другого active proctype появляется в исходном коде

    Активные процессы, в том числе init {}, появляются в порядке появления внутри исходного кода. Все остальные процессы порождаются в порядке появления соответствующих run ... заявление.


Я выявил следующие проблемы в вашей модели:

  • Инструкция pred[_pid] = 0 бесполезно, потому что это место в памяти читается только после назначения pred[_pid] = tail

  • Когда вы освобождаете преемника узла, вы устанавливаете succ_must_wait[_pid] в 0 только и вы не сделаете недействительным экземпляр узла, на который ожидает ваш преемник. Это проблема, которую вы определили в своем вопросе, но не смогли ее решить. Решение, которое я предлагаю, состоит в том, чтобы добавить следующий код:

    pid j;
    for (j: 1..4) {
        if
            :: pred[j] == _pid -> pred[j] = 0;
            :: else -> skip;
        fi
    }
    

    Это должно быть заключено в atomic {} блок.

  • Вы правильно установили tail вернуться к 0 когда вы обнаружите, что узел, который только что покинул критическую секцию, также является last узел в очереди. Вы также правильно заключили эту операцию в atomic {} блок. Тем не менее, может случиться так, что - когда вы собираетесь войти в это atomic {} block- какой-то другой процесс - который все еще ждал в неактивном состоянии - решает выполнить начальный атомарный блок и копирует текущее значение tail - который соответствует узлу, который только что истек - в его собственном pred[_pid] место в памяти. Если теперь узел, который только что вышел из критической секции, пытается присоединиться к нему еще раз, устанавливая его собственное значение succ_must_wait[_pid] в 1, вы получите еще один случай циклического ожидания среди процессов. Правильный подход - объединить эту часть с кодом, освобождающим преемника.


Следующая встроенная функция может использоваться для освобождения преемника данного узла:

inline release_succ(i)
{
    d_step {
        pid j;
        for (j: 1..4) {
            if
                :: pred[j] == i ->
                    pred[j] = 0;
                :: else ->
                    skip;
            fi
        }
        succ_must_wait[i] = 0;
        if
            :: tail == _pid -> tail = 0;
            :: else -> skip;
        fi
    }
}

Полная модель выглядит следующим образом:

byte ncritR;
byte ncritW;
byte tail;
bool succ_must_wait[5];
byte pred[5];

init
{
    skip
}

inline release_succ(i)
{
    d_step {
        pid j;
        for (j: 1..4) {
            if
                :: pred[j] == i ->
                    pred[j] = 0;
                :: else ->
                    skip;
            fi
        }
        succ_must_wait[i] = 0;
        if
            :: tail == _pid -> tail = 0;
            :: else -> skip;
        fi
    }
}

active [2] proctype reader()
{
loop:
    succ_must_wait[_pid] = 1;
    d_step {
        pred[_pid] = tail;
        tail = _pid;
    }

trying:
    (succ_must_wait[pred[_pid]] == 0)

    ncritR++;
    release_succ(_pid);

    // critical section    
progress_reader:
    assert(ncritR > 0);
    assert(ncritW == 0);

    ncritR--;

    goto loop;
}

active [2] proctype writer()
{
loop:
    succ_must_wait[_pid] = 1;

    d_step {
        pred[_pid] = tail;
        tail = _pid;
    }

trying:
    (succ_must_wait[pred[_pid]] == 0) && (ncritR == 0)

    ncritW++;

    // critical section
progress_writer:
    assert(ncritR == 0);
    assert(ncritW == 1);

    ncritW--;

    release_succ(_pid);

    goto loop;
}

Я добавил следующие свойства в модель:

  • p0: писатель с _pid равно 4 проходит состояние прогресса бесконечно часто при условии, что ему дается возможность выполнять некоторые инструкции бесконечно часто:

    ltl p0 {
       ([]<> (_last == 4)) ->
       ([]<> writer[4]@progress_writer)
    };
    

    Это свойство должно быть true,

  • p1: в критическом разделе никогда не бывает более одного читателя:

    ltl p1 {
        ([] (ncritR <= 1))
    };
    

    Очевидно, мы ожидаем, что это свойство будет false в модели, которая соответствует вашей спецификации.

  • p2: в критическом разделе никогда не бывает более одного автора:

    ltl p2 {
        ([] (ncritW <= 1))
    };
    

    Это свойство должно быть true,

  • p3: нет ни одного узла, который является предшественником двух других узлов одновременно, если только такой узел не является узлом 0:

    ltl p3 {
        [] (
            (((pred[1] != 0) && (pred[2] != 0)) -> (pred[1] != pred[2])) &&
            (((pred[1] != 0) && (pred[3] != 0)) -> (pred[1] != pred[3])) &&
            (((pred[1] != 0) && (pred[4] != 0)) -> (pred[1] != pred[4])) &&
            (((pred[2] != 0) && (pred[3] != 0)) -> (pred[2] != pred[3])) &&
            (((pred[2] != 0) && (pred[4] != 0)) -> (pred[2] != pred[4])) &&
            (((pred[3] != 0) && (pred[4] != 0)) -> (pred[3] != pred[4]))
        )
    };
    

    Это свойство должно быть true,

  • p4: всегда верно, что когда автор _pid равно 4 пытается получить доступ к критическому разделу, тогда он в конечном итоге попадет туда:

    ltl p4 {
        [] (writer[4]@trying -> <> writer[4]@progress_writer)
    };
    

    Это свойство должно быть true,

Результат проверки соответствует нашим ожиданиям:

~$ spin -search -ltl p0 -a clhrw_lock.pml 
...
Full statespace search for:
    never claim             + (p0)
    assertion violations    + (if within scope of claim)
    acceptance   cycles     + (fairness disabled)
    invalid end states      - (disabled by never claim)

State-vector 68 byte, depth reached 3305, errors: 0
...

~$ spin -search -ltl p1 -a clhrw_lock.pml 
...
Full statespace search for:
    never claim             + (p1)
    assertion violations    + (if within scope of claim)
    acceptance   cycles     + (fairness disabled)
    invalid end states      - (disabled by never claim)

State-vector 68 byte, depth reached 1692, errors: 1
...

~$ spin -search -ltl p2 -a clhrw_lock.pml
...
Full statespace search for:
    never claim             + (p2)
    assertion violations    + (if within scope of claim)
    acceptance   cycles     + (fairness disabled)
    invalid end states      - (disabled by never claim)

State-vector 68 byte, depth reached 3115, errors: 0
...

~$ spin -search -ltl p3 -a clhrw_lock.pml 
...
Full statespace search for:
    never claim             + (p3)
    assertion violations    + (if within scope of claim)
    acceptance   cycles     + (fairness disabled)
    invalid end states      - (disabled by never claim)

State-vector 68 byte, depth reached 3115, errors: 0
...

~$ spin -search -ltl p4 -a clhrw_lock.pml 
...
Full statespace search for:
    never claim             + (p4)
    assertion violations    + (if within scope of claim)
    acceptance   cycles     + (fairness disabled)
    invalid end states      - (disabled by never claim)

State-vector 68 byte, depth reached 3115, errors: 0
...
Другие вопросы по тегам