Это идиоматический пул рабочих потоков в Go?
Я пытаюсь написать простой рабочий пул с программами.
- Код, который я написал, идиоматичен? Если нет, то что должно измениться?
- Я хочу иметь возможность установить максимальное количество рабочих потоков равным 5 и блокировать, пока рабочий не станет доступным, если все 5 заняты. Как бы я расширил это, чтобы иметь только пул максимум 5 рабочих? Я порождаю статические 5 горутин и даю каждому
work_channel
?
код:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := 'a'; c < 'z'; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s\n", x)
}
}
2 ответа
Ваше решение ни в коем случае не является рабочим пулом подпрограмм: ваш код не ограничивает одновременные подпрограммы и не "повторно использует" подпрограммы (он всегда запускает новую при получении нового задания).
Модель производитель-потребитель
Как я писал на взломщике паролей Bruteforce MD5, вы можете использовать шаблон " производитель-потребитель". У вас может быть назначенная программа продюсера, которая будет генерировать задания (что делать / вычислять) и отправлять их по каналу заданий. Вы можете иметь фиксированный пул потребительских программ (например, 5 из них), которые будут циклически проходить по каналу, по которому доставляются задания, и каждый будет выполнять / завершать полученные задания.
Производитель Goroutine может просто закрыть jobs
канал, когда все рабочие места были созданы и отправлены, должным образом сигнализируя потребителям, что больше рабочих мест не будет. for ... range
Конструкция на канале обрабатывает событие "close" и правильно завершается. Обратите внимание, что все задания, отправленные до закрытия канала, все равно будут доставлены.
Это привело бы к чистому дизайну, привело бы к фиксированному (но произвольному) количеству подпрограмм и всегда использовало бы 100% ЦП (если число подпрограмм больше, чем число ядер ЦП). Он также имеет то преимущество, что его можно "регулировать" с помощью правильного выбора пропускной способности канала (буферизованного канала) и количества пользовательских программ.
Обратите внимание, что эта модель иметь назначенный производитель программы не является обязательным. Вы также можете иметь несколько процедур для создания заданий, но затем вы должны синхронизировать их, чтобы закрыть только jobs
канал, когда все продюсерские программы завершены, производя задания - еще пытаясь отправить другую работу на jobs
канал, когда он уже закрыт, вызывает панику во время выполнения. Обычно создание рабочих мест обходится дешево и может производиться гораздо быстрее, чем их выполнение, поэтому такая модель для производства их за 1 цикл, в то время как многие их потребляют / выполняют, хороша на практике.
Обработка результатов:
Если у заданий есть результаты, вы можете выбрать назначенный канал результатов, по которому можно доставлять результаты ("отосланы назад"), или вы можете обработать результаты у потребителя, когда задание будет выполнено / завершено. Последнее может даже быть реализовано с помощью функции "обратного вызова", которая обрабатывает результаты. Важным моментом является то, могут ли результаты обрабатываться независимо или их нужно объединять (например, каркас-сокращение) или агрегировать.
Если вы идете с results
канал, вам также нужна процедура, которая получает значения от него, предотвращая блокировку потребителей (произойдет, если буфер results
будет заполнен).
С results
канал
Вместо того, чтобы отправлять просто string
значения в качестве заданий и результатов, я бы создал тип оболочки, который может содержать любую дополнительную информацию, и поэтому он гораздо более гибок:
type Job struct {
Id int
Work string
Result string
}
Обратите внимание, что Job
Структура также оборачивает результат, поэтому, когда мы отправляем обратно результат, он также содержит оригинал Job
как контекст - часто очень полезно. Также обратите внимание, что выгодно просто отправлять указатели (*Job
) на каналах вместо Job
значения, поэтому нет необходимости делать "бесчисленные" копии Job
с, а также размер Job
значение структуры становится неактуальным.
Вот как может выглядеть этот производитель-потребитель:
Я бы использовал 2 sync.WaitGroup
ценности, их роль будет следовать:
var wg, wg2 sync.WaitGroup
Производитель несет ответственность за создание заданий для выполнения:
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
Когда сделано (больше нет работы), jobs
канал закрыт, что сигнализирует потребителям, что больше рабочих мест не будет.
Обратите внимание, что produce()
видит jobs
канал как только отправка, потому что это то, что продюсер должен делать только с этим: посылать на него задания (помимо закрытия, но это также разрешено на канале только для отправки). Случайный прием в источнике будет ошибкой времени компиляции (обнаруженной рано, во время компиляции).
Ответственность потребителя состоит в том, чтобы получать рабочие места, пока рабочие места могут быть получены, и выполнять их:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
Обратите внимание, что consume()
видит jobs
канал только для приема; Потребителю нужно только получить от него. Точно так же results
Канал отправляется только для потребителя.
Также обратите внимание, что results
Канал не может быть закрыт здесь, так как есть несколько пользовательских подпрограмм, и только первая попытка закрыть его будет успешной, а дальнейшие вызовут панику во время выполнения! results
канал может (должен) быть закрыт после того, как все потребительские программы закончились, потому что тогда мы можем быть уверены, что дальнейшие значения (результаты) не будут отправлены на results
канал.
У нас есть результаты, которые необходимо проанализировать:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
Как вы можете видеть, это также получает результаты, пока они могут прийти (до results
канал закрыт). results
канал для анализатора только приемный.
Обратите внимание на использование типов каналов: когда это достаточно, используйте только однонаправленный тип канала для раннего обнаружения и предотвращения ошибок во время компиляции. Используйте только двунаправленный тип канала, если вам нужны оба направления.
И вот как все они склеены:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
Пример вывода:
Вот пример вывода:
Как видите, результаты приходят и анализируются до того, как все задания будут поставлены в очередь:
worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
Попробуйте полное приложение на игровой площадке Go.
Без results
канал
Код значительно упрощается, если мы не используем results
канал, но пользовательские программы обрабатывают результат сразу (напечатайте его в нашем случае). В этом случае нам не нужно 2 sync.WaitGroup
значения (2-е необходимо было только дождаться завершения анализа).
Без results
Направить полное решение так:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
Выход "как" с results
канал (но, конечно, порядок выполнения / завершения является случайным).
Попробуйте этот вариант на игровой площадке Go.
Вы можете реализовать счетный семафор, чтобы ограничить параллелизм выполнения подпрограмм.
var tokens = make(chan struct{}, 20)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
tokens <- struct{}{} // acquire a token before performing work
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
<-tokens // release the token
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
Это общая схема, используемая для ограничения количества работников. Конечно, вы можете изменить место выпуска / приобретения токенов в соответствии с вашим кодом.