Принудительный приоритет оператора go select

У меня есть следующий кусок кода:

func sendRegularHeartbeats(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-time.After(1 * time.Second):
            sendHeartbeat()
        }
    }
}

Эта функция выполняется в специальной подпрограмме и отправляет сообщение сердцебиения каждую секунду. Весь процесс должен быть остановлен немедленно, когда контекст отменен.

Теперь рассмотрим следующий сценарий:

ctx, cancel := context.WithCancel(context.Background())
cancel()
go sendRegularHeartbeats(ctx)

Это запускает процедуру сердцебиения с закрытым контекстом. В таком случае я не хочу, чтобы пульс передавался. Итак, первый case Блок в выборе должен быть введен немедленно.

Тем не менее, кажется, что порядок, в котором case Оценка блоков не гарантируется, и что код иногда отправляет сообщение пульса, даже если контекст уже отменен.

Как правильно реализовать такое поведение?

Я мог бы добавить проверку isContextclosed во второй case, но это больше похоже на уродливое решение проблемы.

3 ответа

Решение

Обратите внимание:

Ваш пример будет работать так, как вы намереваетесь, как будто контекст уже отменен, когда sendRegularHeartbeats() называется, case <-ctx.Done() коммуникация будет единственной, готовой продолжить и поэтому выбранной. Другой case <-time.After(1 * time.Second) будет готов продолжить только через 1 секунду, поэтому он не будет выбран вначале. Но чтобы явно обрабатывать приоритеты, когда несколько дел могут быть готовы, читайте дальше.


в отличие от case ветви switch заявление (где порядок оценки - это порядок, в котором они перечислены), в case ветви select заявление.

Цитирование из Spec: Выберите утверждения:

Если одно или несколько сообщений могут продолжаться, то одно из них может быть выбрано с помощью равномерного псевдослучайного выбора. В противном случае, если есть случай по умолчанию, этот случай выбирается. Если регистр по умолчанию отсутствует, оператор SELECT блокируется до тех пор, пока не будет продолжено хотя бы одно из сообщений.

Если больше сообщений может продолжаться, один выбирается случайным образом. Период.

Если вы хотите сохранить приоритет, вы должны сделать это самостоятельно (вручную). Вы можете сделать это, используя несколько select операторы (последующие, не вложенные), перечисляющие те с более высоким приоритетом в более ранних select также обязательно добавьте default ветвь, чтобы избежать блокировки, если они не готовы продолжить. Ваш пример требует 2 select выписки, первая проверка <-ctx.Done() поскольку это тот, для которого вы хотите более высокий приоритет.

Я также рекомендую использовать один time.Ticker вместо звонка time.After() в каждой итерации (time.After() также использует time.Ticker под капотом, но он не использует его повторно, просто "выбрасывает" и создает новый при следующем вызове).

Вот пример реализации:

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sendHeartbeat()
        }
    }
}

Это не будет отправлять пульс, если контекст уже отменен, когда sendRegularHeartbeats() называется, как вы можете проверить / проверить его на Go Playground.

Если вы задержите cancel() звоните в течение 2,5 секунд, после чего будет отправлено ровно 2 сердцебиения

ctx, cancel := context.WithCancel(context.Background())
go sendRegularHeartbeats(ctx)
time.Sleep(time.Millisecond * 2500)
cancel()
time.Sleep(time.Second * 2)

Попробуйте это на Go Playground.

Принятый ответ имеет неверное предложение:

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        //first select 
        select {
        case <-ctx.Done():
            return
        default:
        }

        //second select
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sendHeartbeat()
        }
    }
}

Это не помогает из-за следующего сценария:

  1. оба канала пусты
  2. первый выбор прогонов
  3. оба канала получают сообщение одновременно
  4. вы находитесь в той же вероятностной игре, как если бы вы ничего не сделали в первом выборе

Альтернативным, но все же несовершенным способом является защита от одновременных событий Done() ("неправильный выбор") после использования события тикера, т.е.

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {            
        //select as usual
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            //give priority to a possible concurrent Done() event non-blocking way
            select {
              case <-ctx.Done():
              return
            default:
            }
            sendHeartbeat()
        }
    }
}

Предостережение: проблема с этим в том, что он допускает путаницу "достаточно близких" событий - например, даже если событие тикера появилось раньше, событие Done произошло достаточно быстро, чтобы предупредить сердцебиение. На данный момент не существует идеального решения.

Если крайне важно поддерживать этот приоритет операций, вы можете:

  • Потребляй из каждого канала в отдельной горутине
  • Пусть каждая из этих программ запишет сообщение в общий третий канал с указанием его типа
  • Попросите третью программу потреблять из этого канала, читая полученные сообщения, чтобы определить, является ли это галочкой и следует ли sendHeartbeat или если это отмена, и он должен выйти

Таким образом, сообщения, полученные на других каналах, (вероятно, вы не можете гарантировать порядок выполнения параллельных подпрограмм) поступают на третий канал в том порядке, в котором они запускаются, что позволяет вам обрабатывать их надлежащим образом.

Однако стоит отметить, что в этом, вероятно, нет необходимости. select не гарантирует какой case будет выполняться, если несколько случаев удастся одновременно. Это, вероятно, редкое событие; отмена и тикер должны были бы сработать прежде, чем любой из них был обработан select, В подавляющем большинстве случаев только одна или другая будет запускаться на любой итерации цикла, поэтому она будет вести себя точно так, как ожидается. Если вы можете терпеть редкие случаи срабатывания одного дополнительного тактового импульса после отмены, вам лучше сохранить код, который у вас есть, так как он более эффективен и более читабелен.

Другие вопросы по тегам