Принудительный приоритет оператора go select
У меня есть следующий кусок кода:
func sendRegularHeartbeats(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
sendHeartbeat()
}
}
}
Эта функция выполняется в специальной подпрограмме и отправляет сообщение сердцебиения каждую секунду. Весь процесс должен быть остановлен немедленно, когда контекст отменен.
Теперь рассмотрим следующий сценарий:
ctx, cancel := context.WithCancel(context.Background())
cancel()
go sendRegularHeartbeats(ctx)
Это запускает процедуру сердцебиения с закрытым контекстом. В таком случае я не хочу, чтобы пульс передавался. Итак, первый case
Блок в выборе должен быть введен немедленно.
Тем не менее, кажется, что порядок, в котором case
Оценка блоков не гарантируется, и что код иногда отправляет сообщение пульса, даже если контекст уже отменен.
Как правильно реализовать такое поведение?
Я мог бы добавить проверку isContextclosed во второй case
, но это больше похоже на уродливое решение проблемы.
3 ответа
Обратите внимание:
Ваш пример будет работать так, как вы намереваетесь, как будто контекст уже отменен, когда sendRegularHeartbeats()
называется, case <-ctx.Done()
коммуникация будет единственной, готовой продолжить и поэтому выбранной. Другой case <-time.After(1 * time.Second)
будет готов продолжить только через 1 секунду, поэтому он не будет выбран вначале. Но чтобы явно обрабатывать приоритеты, когда несколько дел могут быть готовы, читайте дальше.
в отличие от case
ветви switch
заявление (где порядок оценки - это порядок, в котором они перечислены), в case
ветви select
заявление.
Цитирование из Spec: Выберите утверждения:
Если одно или несколько сообщений могут продолжаться, то одно из них может быть выбрано с помощью равномерного псевдослучайного выбора. В противном случае, если есть случай по умолчанию, этот случай выбирается. Если регистр по умолчанию отсутствует, оператор SELECT блокируется до тех пор, пока не будет продолжено хотя бы одно из сообщений.
Если больше сообщений может продолжаться, один выбирается случайным образом. Период.
Если вы хотите сохранить приоритет, вы должны сделать это самостоятельно (вручную). Вы можете сделать это, используя несколько select
операторы (последующие, не вложенные), перечисляющие те с более высоким приоритетом в более ранних select
также обязательно добавьте default
ветвь, чтобы избежать блокировки, если они не готовы продолжить. Ваш пример требует 2 select
выписки, первая проверка <-ctx.Done()
поскольку это тот, для которого вы хотите более высокий приоритет.
Я также рекомендую использовать один time.Ticker
вместо звонка time.After()
в каждой итерации (time.After()
также использует time.Ticker
под капотом, но он не использует его повторно, просто "выбрасывает" и создает новый при следующем вызове).
Вот пример реализации:
func sendRegularHeartbeats(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return
case <-ticker.C:
sendHeartbeat()
}
}
}
Это не будет отправлять пульс, если контекст уже отменен, когда sendRegularHeartbeats()
называется, как вы можете проверить / проверить его на Go Playground.
Если вы задержите cancel()
звоните в течение 2,5 секунд, после чего будет отправлено ровно 2 сердцебиения
ctx, cancel := context.WithCancel(context.Background())
go sendRegularHeartbeats(ctx)
time.Sleep(time.Millisecond * 2500)
cancel()
time.Sleep(time.Second * 2)
Попробуйте это на Go Playground.
Принятый ответ имеет неверное предложение:
func sendRegularHeartbeats(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
//first select
select {
case <-ctx.Done():
return
default:
}
//second select
select {
case <-ctx.Done():
return
case <-ticker.C:
sendHeartbeat()
}
}
}
Это не помогает из-за следующего сценария:
- оба канала пусты
- первый выбор прогонов
- оба канала получают сообщение одновременно
- вы находитесь в той же вероятностной игре, как если бы вы ничего не сделали в первом выборе
Альтернативным, но все же несовершенным способом является защита от одновременных событий Done() ("неправильный выбор") после использования события тикера, т.е.
func sendRegularHeartbeats(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
//select as usual
select {
case <-ctx.Done():
return
case <-ticker.C:
//give priority to a possible concurrent Done() event non-blocking way
select {
case <-ctx.Done():
return
default:
}
sendHeartbeat()
}
}
}
Предостережение: проблема с этим в том, что он допускает путаницу "достаточно близких" событий - например, даже если событие тикера появилось раньше, событие Done произошло достаточно быстро, чтобы предупредить сердцебиение. На данный момент не существует идеального решения.
Если крайне важно поддерживать этот приоритет операций, вы можете:
- Потребляй из каждого канала в отдельной горутине
- Пусть каждая из этих программ запишет сообщение в общий третий канал с указанием его типа
- Попросите третью программу потреблять из этого канала, читая полученные сообщения, чтобы определить, является ли это галочкой и следует ли
sendHeartbeat
или если это отмена, и он должен выйти
Таким образом, сообщения, полученные на других каналах, (вероятно, вы не можете гарантировать порядок выполнения параллельных подпрограмм) поступают на третий канал в том порядке, в котором они запускаются, что позволяет вам обрабатывать их надлежащим образом.
Однако стоит отметить, что в этом, вероятно, нет необходимости. select
не гарантирует какой case
будет выполняться, если несколько случаев удастся одновременно. Это, вероятно, редкое событие; отмена и тикер должны были бы сработать прежде, чем любой из них был обработан select
, В подавляющем большинстве случаев только одна или другая будет запускаться на любой итерации цикла, поэтому она будет вести себя точно так, как ожидается. Если вы можете терпеть редкие случаи срабатывания одного дополнительного тактового импульса после отмены, вам лучше сохранить код, который у вас есть, так как он более эффективен и более читабелен.