Это идиоматический пул рабочих потоков в Go?

Я пытаюсь написать простой рабочий пул с программами.

  • Код, который я написал, идиоматичен? Если нет, то что должно измениться?
  • Я хочу иметь возможность установить максимальное количество рабочих потоков равным 5 и блокировать, пока рабочий не станет доступным, если все 5 заняты. Как бы я расширил это, чтобы иметь только пул максимум 5 рабочих? Я порождаю статические 5 горутин и даю каждому work_channel?

код:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

func main() {
    var work_channel = make(chan string)
    var results_channel = make(chan string)

    // create goroutine per item in work_channel
    go func() {
        var c = 0
        var wg sync.WaitGroup
        for work := range work_channel {
            wg.Add(1)
            go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
            c++
        }
        wg.Wait()
        fmt.Println("closing results channel")
        close(results_channel)
    }()

    // add work to the work_channel
    go func() {
        for c := 'a'; c < 'z'; c++ {
            work_channel <- fmt.Sprintf("%c", c)
        }
        close(work_channel)
        fmt.Println("sent work to work_channel")
    }()

    for x := range results_channel {
        fmt.Printf("result: %s\n", x)
    }
}

2 ответа

Решение

Ваше решение ни в коем случае не является рабочим пулом подпрограмм: ваш код не ограничивает одновременные подпрограммы и не "повторно использует" подпрограммы (он всегда запускает новую при получении нового задания).

Модель производитель-потребитель

Как я писал на взломщике паролей Bruteforce MD5, вы можете использовать шаблон " производитель-потребитель". У вас может быть назначенная программа продюсера, которая будет генерировать задания (что делать / вычислять) и отправлять их по каналу заданий. Вы можете иметь фиксированный пул потребительских программ (например, 5 из них), которые будут циклически проходить по каналу, по которому доставляются задания, и каждый будет выполнять / завершать полученные задания.

Производитель Goroutine может просто закрыть jobs канал, когда все рабочие места были созданы и отправлены, должным образом сигнализируя потребителям, что больше рабочих мест не будет. for ... range Конструкция на канале обрабатывает событие "close" и правильно завершается. Обратите внимание, что все задания, отправленные до закрытия канала, все равно будут доставлены.

Это привело бы к чистому дизайну, привело бы к фиксированному (но произвольному) количеству подпрограмм и всегда использовало бы 100% ЦП (если число подпрограмм больше, чем число ядер ЦП). Он также имеет то преимущество, что его можно "регулировать" с помощью правильного выбора пропускной способности канала (буферизованного канала) и количества пользовательских программ.

Обратите внимание, что эта модель иметь назначенный производитель программы не является обязательным. Вы также можете иметь несколько процедур для создания заданий, но затем вы должны синхронизировать их, чтобы закрыть только jobs канал, когда все продюсерские программы завершены, производя задания - еще пытаясь отправить другую работу на jobs канал, когда он уже закрыт, вызывает панику во время выполнения. Обычно создание рабочих мест обходится дешево и может производиться гораздо быстрее, чем их выполнение, поэтому такая модель для производства их за 1 цикл, в то время как многие их потребляют / выполняют, хороша на практике.

Обработка результатов:

Если у заданий есть результаты, вы можете выбрать назначенный канал результатов, по которому можно доставлять результаты ("отосланы назад"), или вы можете обработать результаты у потребителя, когда задание будет выполнено / завершено. Последнее может даже быть реализовано с помощью функции "обратного вызова", которая обрабатывает результаты. Важным моментом является то, могут ли результаты обрабатываться независимо или их нужно объединять (например, каркас-сокращение) или агрегировать.

Если вы идете с results канал, вам также нужна процедура, которая получает значения от него, предотвращая блокировку потребителей (произойдет, если буфер results будет заполнен).

С results канал

Вместо того, чтобы отправлять просто string значения в качестве заданий и результатов, я бы создал тип оболочки, который может содержать любую дополнительную информацию, и поэтому он гораздо более гибок:

type Job struct {
    Id     int
    Work   string
    Result string
}

Обратите внимание, что Job Структура также оборачивает результат, поэтому, когда мы отправляем обратно результат, он также содержит оригинал Job как контекст - часто очень полезно. Также обратите внимание, что выгодно просто отправлять указатели (*Job) на каналах вместо Job значения, поэтому нет необходимости делать "бесчисленные" копии Job с, а также размер Job значение структуры становится неактуальным.

Вот как может выглядеть этот производитель-потребитель:

Я бы использовал 2 sync.WaitGroup ценности, их роль будет следовать:

var wg, wg2 sync.WaitGroup

Производитель несет ответственность за создание заданий для выполнения:

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

Когда сделано (больше нет работы), jobs канал закрыт, что сигнализирует потребителям, что больше рабочих мест не будет.

Обратите внимание, что produce() видит jobs канал как только отправка, потому что это то, что продюсер должен делать только с этим: посылать на него задания (помимо закрытия, но это также разрешено на канале только для отправки). Случайный прием в источнике будет ошибкой времени компиляции (обнаруженной рано, во время компиляции).

Ответственность потребителя состоит в том, чтобы получать рабочие места, пока рабочие места могут быть получены, и выполнять их:

func consume(id int, jobs <-chan *Job, results chan<- *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
        results <- job
    }
}

Обратите внимание, что consume() видит jobs канал только для приема; Потребителю нужно только получить от него. Точно так же results Канал отправляется только для потребителя.

Также обратите внимание, что results Канал не может быть закрыт здесь, так как есть несколько пользовательских подпрограмм, и только первая попытка закрыть его будет успешной, а дальнейшие вызовут панику во время выполнения! results канал может (должен) быть закрыт после того, как все потребительские программы закончились, потому что тогда мы можем быть уверены, что дальнейшие значения (результаты) не будут отправлены на results канал.

У нас есть результаты, которые необходимо проанализировать:

func analyze(results <-chan *Job) {
    defer wg2.Done()
    for job := range results {
        fmt.Printf("result: %s\n", job.Result)
    }
}

Как вы можете видеть, это также получает результаты, пока они могут прийти (до results канал закрыт). results канал для анализатора только приемный.

Обратите внимание на использование типов каналов: когда это достаточно, используйте только однонаправленный тип канала для раннего обнаружения и предотвращения ошибок во время компиляции. Используйте только двунаправленный тип канала, если вам нужны оба направления.

И вот как все они склеены:

func main() {
    jobs := make(chan *Job, 100)    // Buffered channel
    results := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs, results)
    }
    // Start producing
    go produce(jobs)

    // Start analyzing:
    wg2.Add(1)
    go analyze(results)

    wg.Wait() // Wait all consumers to finish processing jobs

    // All jobs are processed, no more values will be sent on results:
    close(results)

    wg2.Wait() // Wait analyzer to analyze all results
}

Пример вывода:

Вот пример вывода:

Как видите, результаты приходят и анализируются до того, как все задания будут поставлены в очередь:

worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms

Попробуйте полное приложение на игровой площадке Go.

Без results канал

Код значительно упрощается, если мы не используем results канал, но пользовательские программы обрабатывают результат сразу (напечатайте его в нашем случае). В этом случае нам не нужно 2 sync.WaitGroup значения (2-е необходимо было только дождаться завершения анализа).

Без results Направить полное решение так:

var wg sync.WaitGroup

type Job struct {
    Id   int
    Work string
}

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

func consume(id int, jobs <-chan *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
    }
}

func main() {
    jobs := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs)
    }
    // Start producing
    go produce(jobs)

    wg.Wait() // Wait all consumers to finish processing jobs
}

Выход "как" с results канал (но, конечно, порядок выполнения / завершения является случайным).

Попробуйте этот вариант на игровой площадке Go.

Вы можете реализовать счетный семафор, чтобы ограничить параллелизм выполнения подпрограмм.

var tokens = make(chan struct{}, 20)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    tokens <- struct{}{} // acquire a token before performing work
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    <-tokens // release the token
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

Это общая схема, используемая для ограничения количества работников. Конечно, вы можете изменить место выпуска / приобретения токенов в соответствии с вашим кодом.

Другие вопросы по тегам