Параллельная обработка N потоков?

Уточненный вопрос:

У меня есть следующий скрипт для доступа к веб и локальным ресурсам. Я хочу ограничить количество подключений к Интернету N (сайт работает медленно) и доступ к локальным ресурсам (executeLocalNetworkProcess) не должен блокировать другие веб-запросы. (Так что он всегда будет запускать N веб-запросов).

В некоторых категориях очень мало предметов, а в других - много. Параллельное выполнение должно выполняться для всех элементов всех категорий, чтобы использовать веб-соединения.

let categories = getCategories() // get a seq of category from web service
for c in categories do
    getItemsByCategory c // returns a seq of item from web 
    |> Seq.iter (fun (item, _) -> // Want to process N items in parallel
        getMoreDataFromWeb item // from web
        executeLocalNetworkProcess item // local disk/network access; may take a while
        downloadBigFile item  // from web
        )

Каков наилучший подход для реализации этого в F#?

2 ответа

Возможно, вы захотите включить исходный код модуля PSeq из F# PowerPack в свою собственную базовую библиотеку. Затем вы можете просто позвонить PSeq.iter:

for category, description, _ as c in getCategories() do
    printfn "%s" category
    getItemsByCategory c
    |> PSeq.iter(fun (item, description, _) -> process item)

Я сделал нечто подобное ранее, разбив последовательность на партии по размеру n и параллельная обработка партии.

Мы можем создать пакеты последовательности, используя код в этом ответе SO: /questions/37473347/samyij-idiomatichnyij-sposob-napisaniya-paketov-razmera-seq-v-f/37473361#37473361

Оттуда нам просто нужно пройти через элементы в каждом пакете параллельно. Я хотел бы положить его в Array.Parallel модуль.

module Array =
    module Parallel =
        let batchIter size action array =
            let batchesOf n =
                Seq.mapi (fun i v -> i / n, v) >>
                Seq.groupBy fst >>
                Seq.map snd >>
                Seq.map (Seq.map snd)

            for batch in array |> batchesOf size do 
                batch
                |> Seq.toArray    
                |> Array.Parallel.iter action

Следующий код разбивает список из 100 элементов на пакеты по 8 и печатает элементы каждой партии параллельно.

[1..100]
|> Array.Parallel.batchIter 8 (printfn "%d")

Чтобы применить это к вашему коду, вы смотрите на что-то вроде этого:

let categories = getCategories()
for c in categories do
    match c with | (category, description, _) -> printfn "%s" category
    getItemsByCategory c
    |> Array.Parallel.batchIter 8 (fun (item, description, _) ->
        process item
        )

Этот подход, однако, будет ожидать завершения обработки всего пакета, прежде чем запускать следующий пакет.

Другие вопросы по тегам