Выполнение части кода ровно 1 раз внутри Parallel.ForEach

Мне нужно запросить в моей компании CRM Solution(Oracle прямо сейчас) для наших 600 тыс. Пользователей и обновить их там, если они существуют, или создать их, если их нет. Чтобы узнать, существует ли пользователь прямо сейчас, я использую сторонний WS. А с 600k пользователями это может быть настоящей болью из-за времени, которое требуется каждый раз, чтобы получить ответ (около 1 секунды). Так что мне удалось изменить свой код для использования Parallel.ForEachзапрашивая каждую запись всего за 0,35 секунды и добавляя ее в List<User> записей, которые будут созданы или будут обновлены (прямо сейчас это довольно глупо, поэтому мне нужно разделить их на 2 списка и вызвать 2 различных метода WS).

Мой код отлично работал до многопоточности, но занимал слишком много времени. Проблема заключается в том, что я не могу сделать пакет слишком большим или получаю тайм-аут при попытке обновить или создать через веб-сервис. Поэтому я отправляю им около 500 записей одновременно, и когда он запускает критическую часть кода, он выполняется много раз.

Parallel.ForEach(boDS.USERS.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = -1 }, row =>
{
    ...
    user = null;
    user = QueryUserById(row["USER_ID"].Trim());

    if (user == null)
    {
        isUpdate = false;
        gObject.ID = new ID();
    }
    else
    {
        isUpdate = true;
        gObject.ID = user.ID;
    }

    ... fill user attributes as generic fields ...

    gObject.GenericFields = listGenericFields.ToArray();

    if (isUpdate)
        listUserUpdate.Add(gObject);
    else
        listUserCreate.Add(gObject);

    if (i == batchSize - 1 || i == (boDS.USERS.Rows.Count - 1))
    {               
        UpdateProcessingOptions upo = new UpdateProcessingOptions();
        CreateProcessingOptions cpo = new CreateProcessingOptions();
        upo.SuppressExternalEvents = false;
        upo.SuppressRules = false;
        cpo.SuppressExternalEvents = false;
        cpo.SuppressRules = false;

        RNObject[] results = null;

        // <Critical_code>

        if (listUserCreate.Count > 0)
        {
            results = _service.Create(_clientInfoHeader, listUserCreate.ToArray(), cpo);
        }
        if (listUserUpdate.Count > 0)
        {
            _service.Update(_clientInfoHeader, listUserUpdate.ToArray(), upo);
        }
        // </Critical_code>

        listUserUpdate = new List<RNObject>();
        listUserCreate = new List<RNObject>();
    }
    i++;
});

Я думал об использовании lock или же mutex, но это не поможет мне, так как они просто будут ждать, чтобы выполнить потом. Мне нужно какое-то решение, чтобы выполнить только ОДИН РАЗ только в ОДНОМ потоке этой части кода. Является ли это возможным? Кто-нибудь может поделиться светом?

Спасибо и всего наилучшего, Леандро

2 ответа

Как вы указали в комментариях, вы объявляете переменные вне тела цикла. Вот откуда берутся ваши расы.

Давайте возьмем переменную listUserUpdate например. Доступ к нему происходит случайным образом при параллельном выполнении потоков. Пока один поток все еще добавляет к нему, например, в listUserUpdate.Add(gObject); другой поток уже может сбрасывать списки в listUserUpdate = new List<RNObject>(); или перечисляя это в listUserUpdate.ToArray(),

Вам действительно нужно реорганизовать этот код в

  • заставить каждый цикл работать как можно более независимым друг от друга, перемещая переменные внутри тела цикла и
  • получить доступ к данным синхронизированным способом, используя блокировки и / или одновременные коллекции

Вы можете использовать дважды проверенный шаблон блокировки. Это обычно используется для синглетонов, но вы не делаете синглтон здесь, так что общие синглтоны, такие как Lazy<T> не применяются.

Это работает так:

  1. Разделите ваши общие данные в некоторый класс:

    class QuerySharedData { // All the write-once-read-many fields that need to be shared between threads public QuerySharedData() { // Compute all the write-once-read-many fields. Or use a static Create method if that's handy. } }

  2. В вашем внешнем классе добавьте следующее:

    object padlock; volatile QuerySharedData data

  3. В делегате обратного вызова вашего потока сделайте это:

    if (data == null) { lock (padlock) { if (data == null) { data = new QuerySharedData(); // this does all the work to initialize the shared fields } } } var localData = data

Затем используйте данные общего запроса из localData Группируя данные общего запроса в подчиненный класс, вы избегаете необходимости превращения его отдельных полей в изменчивые.

Больше о volatile здесь: Часть 4: Продвинутые потоки.

Обновите мое предположение здесь, что все классы и поля, удерживаемые QuerySharedData только для чтения после инициализации. Если это не так, например, если вы инициализируете список один раз, но добавляете его во многие потоки, этот шаблон не будет работать для вас. Вам придется рассмотреть возможность использования таких вещей, как Thread-Safe Collections.

Другие вопросы по тегам