Событийный шаблон в Голанге

Я использую golang для реализации простого работника, управляемого событиями. Это вот так:

  go func() {
        for {
            select {
            case data := <-ch:
                time.Sleep(1)
                someGlobalMap[data.key] = data.value 
            }
        }
    }()

И основная функция создаст несколько подпрограмм, и каждая из них будет делать так:

ch <- data
fmt.Println(someGlobalMap[data.key])

Как вы можете видеть, поскольку моему работнику нужно некоторое время для выполнения работы, я получу нулевой результат в своей основной функции. Как я могу правильно контролировать этот рабочий процесс?

1 ответ

Решение

РЕДАКТИРОВАТЬ: Я, возможно, неправильно прочитал ваш вопрос, я вижу, что вы упоминаете, что основной запустит многие программы продюсеров. Я думал, что это было много потребительских программ и один производитель. Оставьте здесь ответ, если он может быть полезен для других, ищущих этот шаблон, хотя маркеры по-прежнему применимы к вашему делу.

Так что, если я правильно понимаю ваш вариант использования, вы не можете ожидать, что отправите на канал и сразу же прочитаете результаты. Вы не знаете, когда работник обработает эту посылку, вам нужно общаться между программами, и это делается с помощью каналов. Предполагая, что просто вызов функции с возвращаемым значением не работает в вашем сценарии, если вам действительно нужно отправить работнику, а затем блокировать, пока не получите результат, вы можете отправить канал как часть структуры данных, и блок- получить на него после отправки, т.е.

resCh := make(chan Result)
ch <- Data{key, value, resCh}
res := <- resCh

Но вы, вероятно, должны попытаться вместо этого разбить работу на ряд независимых шагов, см. Пост в блоге, на который я ссылался, в оригинальном ответе.


Оригинальный ответ, где я думал, что это был один производитель - модель с несколькими потребителями / работниками:

Это общий шаблон, для которого семантика Go-функций и каналов очень хорошо подходит. Есть несколько вещей, которые вы должны иметь в виду:

  • Основная функция не будет автоматически ожидать завершения выполнения процедур. Если больше ничего не нужно делать в основном, то программа завершается, и у вас нет ваших результатов.

  • Используемая вами глобальная карта не является поточно-ориентированной. Вам нужно синхронизировать доступ через мьютекс, но есть лучший способ - использовать выходной канал для результатов, который уже синхронизирован.

  • Вы можете использовать for..range по каналу и безопасно разделять канал между несколькими программами. Как мы увидим, это делает этот шаблон довольно элегантным для написания.

Детская площадка: https://play.golang.org/p/WqyZfwldqp

Дополнительные сведения о конвейерах Go и шаблонах параллелизма, о введении обработки ошибок, досрочного отмены и т. Д.: https://blog.golang.org/pipelines

Комментарий к коду, который вы упоминаете:

// could be a command-line flag, a config, etc.
const numGoros = 10

// Data is a similar data structure to the one mentioned in the question.
type Data struct {
    key   string
    value int
}

func main() {
    var wg sync.WaitGroup

    // create the input channel that sends work to the goroutines
    inch := make(chan Data)
    // create the output channel that sends results back to the main function
    outch := make(chan Data)

    // the WaitGroup keeps track of pending goroutines, you can add numGoros
    // right away if you know how many will be started, otherwise do .Add(1)
    // each time before starting a worker goroutine.
    wg.Add(numGoros)
    for i := 0; i < numGoros; i++ {
        // because it uses a closure, it could've used inch and outch automaticaly,
        // but if the func gets bigger you may want to extract it to a named function,
        // and I wanted to show the directed channel types: within that function, you
        // can only receive from inch, and only send (and close) to outch.
        //
        // It also receives the index i, just for fun so it can set the goroutines'
        // index as key in the results, to show that it was processed by different
        // goroutines. Also, big gotcha: do not capture a for-loop iteration variable
        // in a closure, pass it as argument, otherwise it very likely won't do what
        // you expect.
        go func(i int, inch <-chan Data, outch chan<- Data) {
            // make sure WaitGroup.Done is called on exit, so Wait unblocks
            // eventually.
            defer wg.Done()

            // range over a channel gets the next value to process, safe to share
            // concurrently between all goroutines. It exits the for loop once
            // the channel is closed and drained, so wg.Done will be called once
            // ch is closed.
            for data := range inch {
                // process the data...
                time.Sleep(10 * time.Millisecond)
                outch <- Data{strconv.Itoa(i), data.value}
            }
        }(i, inch, outch)
    }

    // start the goroutine that prints the results, use a separate WaitGroup to track
    // it (could also have used a "done" channel but the for-loop would be more complex, with a select).
    var wgResults sync.WaitGroup
    wgResults.Add(1)
    go func(ch <-chan Data) {
        defer wgResults.Done()

        // to prove it processed everything, keep a counter and print it on exit
        var n int
        for data := range ch {
            fmt.Println(data.key, data.value)
            n++
        }

        // for fun, try commenting out the wgResults.Wait() call at the end, the output
        // will likely miss this line.
        fmt.Println(">>> Processed: ", n)
    }(outch)

    // send work, wherever that comes from...
    for i := 0; i < 1000; i++ {
        inch <- Data{"main", i}
    }

    // when there's no more work to send, close the inch, so the goroutines will begin
    // draining it and exit once all values have been processed.
    close(inch)

    // wait for all goroutines to exit
    wg.Wait()

    // at this point, no more results will be written to outch, close it to signal
    // to the results goroutine that it can terminate.
    close(outch)

    // and wait for the results goroutine to actually exit, otherwise the program would
    // possibly terminate without printing the last few values.
    wgResults.Wait()
}

В реальных сценариях, где объем работы не известен заранее, закрытие входного канала может происходить, например, из сигнала SIGINT. Просто убедитесь, что ни один путь кода не может отправить работу после закрытия канала, так как это может вызвать панику.

Другие вопросы по тегам