Голанг: Как работает выбор, когда задействовано несколько каналов?
Я нашел при использовании выбора на нескольких не буферизованных каналах, как
select {
case <- chana:
case <- chanb:
}
Даже когда оба канала имеют данные, но при обработке этого выбора, вызов, который выпадает в случае chana и case chanb, не сбалансирован.
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"time"
)
func main() {
chana := make(chan int)
chanb := make(chan int)
go func() {
for i := 0; i < 1000; i++ {
chana <- 100 * i
}
}()
go func() {
for i := 0; i < 1000; i++ {
chanb <- i
}
}()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
wg.Wait()
}
Запустите эту демонстрацию, когда один из танов, чан закончил чтение / запись, другой может остаться 999-1 слева.
Есть ли способ обеспечить баланс?
нашел связанную тему
golang-каналы, выберите-заявление
4 ответа
The Go select
Заявление не предвзято ни к каким (готовым) делам. Цитирую из спецификации:
Если одно или несколько сообщений могут продолжаться, то одно из них может быть выбрано с помощью равномерного псевдослучайного выбора. В противном случае, если есть случай по умолчанию, этот случай выбирается. Если регистр по умолчанию отсутствует, оператор SELECT блокируется до тех пор, пока не будет продолжено хотя бы одно из сообщений.
Если несколько сообщений могут продолжаться, один выбирается случайным образом. Это не идеальное случайное распределение, и спецификация не гарантирует этого, но оно случайное.
То, что вы испытываете, является результатом того, что Go Playground имеет GOMAXPROCS=1
( что вы можете проверить здесь), и планировщик выполнения подпрограмм не является вытесняющим. Это означает, что по умолчанию goroutines не выполняются параллельно. В случае блокировки выполняется операция (например, чтение из сети или попытка получить или отправить канал, который блокирует), а другая, готовая к запуску, продолжается.
А поскольку в вашем коде нет операции блокировки, подпрограммы не могут быть помещены в парк, и это может быть только одна из ваших подпрограмм "производителя", а другая может не планироваться (никогда).
Запуск вашего кода на моем локальном компьютере, где GOMAXPROCS=4
У меня очень "реалистичные" результаты. Запустив его несколько раз, получим:
finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000
Если вам нужно расставить приоритеты для одного случая, проверьте этот ответ: Принудительный приоритет оператора go select
Поведение по умолчанию select
не гарантирует равный приоритет, но в среднем он будет близок к нему. Если вам нужен гарантированный равный приоритет, вам не следует использовать select
, но вы можете сделать последовательность из 2 неблокирующих прием от 2 каналов, которая может выглядеть примерно так:
for {
select {
case <-chana:
acount++
default:
}
select {
case <-chanb:
bcount++
default:
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
Приведенные выше 2 неблокирующих приема истощают 2 канала с одинаковой скоростью (с одинаковым приоритетом), если оба значения питания, а если нет, то другой постоянно принимается без задержки или блокировки.
Следует отметить, что если ни один из каналов не предоставляет значений для приема, это будет в основном "занятый" цикл и, следовательно, потребляющий вычислительную мощность. Чтобы избежать этого, мы можем обнаружить, что ни один из каналов не был готов, а затем использовать select
оператор с обоими получателями, который затем будет блокироваться, пока один из них не будет готов к приему, не тратя ресурсы ЦП:
for {
received := 0
select {
case <-chana:
acount++
received++
default:
}
select {
case <-chanb:
bcount++
received++
default:
}
if received == 0 {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
Для получения более подробной информации о планировании работы с расписанием см. Следующие вопросы:
Количество потоков, используемых средой выполнения Go
Goroutines 8kb и Windows OS поток 1 МБ
Почему это не создает много потоков, когда многие goroutines заблокированы в записи файла в golang?
Как указано в комментарии, если вы хотите обеспечить баланс, вы можете просто отказаться от использования select
в целом в программе чтения и полагаемся на синхронизацию, обеспечиваемую небуферизованными каналами:
go func() {
for {
<-chana
acount++
<-chanb
bcount++
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
Похоже, что все остальные комментаторы пропустили здесь настоящую ошибку.
Причина, по которой это не сбалансировано, заключается в том, что его буквально невозможно сбалансировать с приведенным выше кодом. Это ОДИН поток, поэтому цикл for может обрабатывать только chana ИЛИ chanb при каждом проходе цикла. ТАК: один из чанов ВСЕГДА достигает 1000 первым.
Оператор if использует ||, что означает, что когда ЛЮБОЕ из них достигает 1000, он останавливается.
ПРОСТОЕ ИСПРАВЛЕНИЕ ОШИБКИ ЗДЕСЬ: изменить || в && в операторе if
Отредактировано: Вы можете балансировать и со стороны предложения, но ответ @icza кажется мне лучшим вариантом, чем этот, и также объясняет планирование, которое вызвало это в первую очередь. Удивительно, но он был односторонним даже на моей (виртуальной) машине.
Вот кое-что, что может сбалансировать две процедуры со стороны предложения (как-то не похоже на Playground).
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
)
func main() {
chana := make(chan int)
chanb := make(chan int)
var balanceSwitch int32
go func() {
for i := 0; i < 1000; i++ {
for atomic.LoadInt32(&balanceSwitch) != 0 {
fmt.Println("Holding R1")
time.Sleep(time.Nanosecond * 1)
}
chana <- 100 * i
fmt.Println("R1: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 1)
}
}()
go func() {
for i := 0; i < 1000; i++ {
for atomic.LoadInt32(&balanceSwitch) != 1 {
fmt.Println("Holding R2")
time.Sleep(time.Nanosecond * 1)
}
chanb <- i
fmt.Println("R2: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 0)
}
}()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
fmt.Println("Acount Bcount", acount, bcount)
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
wg.Wait()
}
Путем изменения atomic.LoadInt32(&balanceSwitch) != XX
а также atomic.StoreInt32(&balanceSwitch, X)
или другие механизмы, вы можете сопоставить его с любым количеством подпрограмм. Возможно, это не лучшая вещь, но если это требование, вам, возможно, придется рассмотреть такие варианты. Надеюсь это поможет.