Распараллелить код во вложенных циклах

Вы всегда слышите, что функциональный код по своей природе проще распараллелить, чем нефункциональный код, поэтому я решил написать функцию, которая выполняет следующее:

С учетом ввода строк, суммируйте количество уникальных символов для каждой строки. Итак, учитывая вход [ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ]наш метод вернет a: 6; b: 6; c: 8,

Вот что я написал:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Этот код идеально распараллеливаем, потому что каждая строка в input могут быть обработаны в своем собственном потоке. Это не так просто, как кажется, так как innerloop добавляет элементы на карту, совместно используемую всеми входными данными.

Я хотел бы, чтобы внутренний цикл был выделен в свой собственный поток, и я не хочу использовать какое-либо изменяемое состояние. Как бы я переписать эту функцию, используя рабочий процесс Async?

4 ответа

Решение

Как уже указывалось, существует конфликт обновления, если вы пытаетесь заставить разные потоки обрабатывать разные входные строки, поскольку каждый поток может увеличивать счетчик каждой буквы. Вы можете сделать так, чтобы каждый поток создавал свою собственную карту, а затем "складывал все карты", но этот последний шаг может быть дорогостоящим (и не очень подходящим для использования потоков из-за общих данных). Я думаю, что большие входные данные, скорее всего, будут работать быстрее, используя алгоритм, подобный приведенному ниже, где каждый поток обрабатывает разные буквы (для всех строк во входных данных). В результате каждый поток имеет свой собственный независимый счетчик, поэтому нет конфликтов обновления и нет окончательного шага для объединения результатов. Однако нам требуется предварительная обработка, чтобы обнаружить "набор уникальных букв", и этот шаг имеет ту же проблему конфликта. (На практике вы, вероятно, знаете заранее весь набор символов, например, алфавиты, а затем можете просто создать 26 потоков для обработки az и обойти эту проблему.) В любом случае, по-видимому, в основном вопрос состоит в том, чтобы исследовать, "как писать F#". асинхронный код для разделения работы между потоками ", поэтому приведенный ниже код демонстрирует это.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

Я действительно "полностью изменил алгоритм", главным образом потому, что исходный алгоритм, который вы использовали, не особенно подходит для прямого распараллеливания данных из-за конфликта обновления. В зависимости от того, что именно вы собираетесь изучать, этот ответ может быть или не быть особенно удовлетворительным для вас.

Вы можете написать это так:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

и распараллелить его только с двумя дополнительными символами, чтобы использовать PSeq модуль из FSharp.PowerPack.Parallel.Seq DLL вместо обычной Seq модуль:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Например, время, затрачиваемое на вычисление частот из Библии короля Джеймса 5,5 Мб, падает с 4,75 до 0,66 с. Это ускорение в 7,2 раза на этом 8-ядерном компьютере.

Параллель - это не то же самое, что асинхронность, как объясняет Дон Сайм.

Итак, IMO, вам лучше использовать PLINQ для распараллеливания.

Я плохо говорю на F#, но я могу решить это. Подумайте об использовании карты / уменьшить:

пусть n = card (Σ) - количество символов σ в алфавите Σ.

Этап карты:

Возникает n процессов, где назначением i-го процесса является подсчет количества вхождений символа σi во всем входном векторе.

Уменьшить этап:

Соберите сумму для каждого из n процессов по порядку. Этот вектор ваши результаты.

Теперь эта версия не приводит к каким-либо улучшениям по сравнению с последовательной версией; Я подозреваю, что здесь есть скрытая зависимость, которая затрудняет параллелизацию по своей сути, но я слишком устала и потеряла сознание, чтобы доказать это сегодня вечером.

Другие вопросы по тегам