Используя Parallel Linq Extensions для объединения двух последовательностей, как можно сначала получить самые быстрые результаты?

Допустим, у меня есть две последовательности, возвращающие целые числа от 1 до 5.

Первый возвращает 1, 2 и 3 очень быстро, но 4 и 5 занимают 200 мс каждый.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

Вторая возвращает 1, 2 и 3 с задержкой 200 мс, но 4 и 5 возвращаются быстро.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Объединение обеих этих последовательностей дает мне только цифры от 1 до 5.

FastFirst().Union(SlowFirst());

Я не могу гарантировать, какой из двух методов имеет задержки в какой момент, поэтому порядок выполнения не может гарантировать решение для меня. Поэтому я хотел бы распараллелить объединение, чтобы минимизировать (искусственную) задержку в моем примере.

Реальный сценарий: у меня есть кеш, который возвращает некоторые сущности, и источник данных, который возвращает все сущности. Я хотел бы иметь возможность возвращать итератор из метода, который внутренне распараллеливает запрос к кешу и источнику данных, чтобы кэшированные результаты давали как можно быстрее.

Примечание 1: я понимаю, что это все еще тратит впустую циклы процессора; Я не спрашиваю, как я могу предотвратить итерации последовательностей по своим медленным элементам, просто как я могу объединить их как можно быстрее.

Обновление 1: я настроил отличный ответ от Ачитаки-сан на прием нескольких производителей и использование ContinueWhenAll для установки CompleteAdding BlockingCollection только один раз. Я просто поместил это здесь, так как это потеряло бы в отсутствии форматирования комментариев. Любые дальнейшие отзывы будут отличными!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}

2 ответа

Решение

Взгляните на это. Первый метод просто возвращает все в порядке поступления результатов. Второй проверяет уникальность. Думаю, если вы их зацепите, вы получите желаемый результат.

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}

Кэш будет почти мгновенным по сравнению с извлечением из базы данных, поэтому вы можете сначала прочитать из кэша и вернуть эти элементы, затем прочитать из базы данных и вернуть элементы, кроме тех, которые были найдены в кэше.

Если вы попытаетесь распараллелить это, вы добавите много сложностей, но получите довольно небольшой выигрыш.

Редактировать:

Если нет предсказуемой разницы в скорости источников, вы можете запустить их в потоках и использовать синхронизированный хэш-набор для отслеживания того, какие элементы вы уже получили, поместить новые элементы в очередь и позволить основному потоку прочитать из очереди:

public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
  HashSet<TKey> found = new HashSet<TKey>();
  List<TItem> queue = new List<TItem>();
  object sync = new object();
  int alive = 0;
  object aliveSync = new object();
  foreach (IEnumerable<TItem> source in sources) {
    lock (aliveSync) {
      alive++;
    }
    new Thread(s => {
      foreach (TItem item in s as IEnumerable<TItem>) {
        TKey key = getKey(item);
        lock (sync) {
          if (found.Add(key)) {
            queue.Add(item);
          }
        }
      }
      lock (aliveSync) {
        alive--;
      }
    }).Start(source);
  }
  while (true) {
    lock (sync) {
      if (queue.Count > 0) {
        foreach (TItem item in queue) {
          yield return item;
        }
        queue.Clear();
      }
    }
    lock (aliveSync) {
      if (alive == 0) break;
    }
    Thread.Sleep(100);
  }
}

Тестовый поток:

public static IEnumerable<int> SlowRandomFeed(Random rnd) {
  int[] values = new int[100];
  for (int i = 0; i < 100; i++) {
    int pos = rnd.Next(i + 1);
    values[i] = i;
    int temp = values[pos];
    values[pos] = values[i];
    values[i] = temp;
  }
  foreach (int value in values) {
    yield return value;
    Thread.Sleep(rnd.Next(200));
  }
}

Тестовое задание:

Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
  Console.Write("{0:0000 }", item);
}
Другие вопросы по тегам