Используя Parallel Linq Extensions для объединения двух последовательностей, как можно сначала получить самые быстрые результаты?
Допустим, у меня есть две последовательности, возвращающие целые числа от 1 до 5.
Первый возвращает 1, 2 и 3 очень быстро, но 4 и 5 занимают 200 мс каждый.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
Вторая возвращает 1, 2 и 3 с задержкой 200 мс, но 4 и 5 возвращаются быстро.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Объединение обеих этих последовательностей дает мне только цифры от 1 до 5.
FastFirst().Union(SlowFirst());
Я не могу гарантировать, какой из двух методов имеет задержки в какой момент, поэтому порядок выполнения не может гарантировать решение для меня. Поэтому я хотел бы распараллелить объединение, чтобы минимизировать (искусственную) задержку в моем примере.
Реальный сценарий: у меня есть кеш, который возвращает некоторые сущности, и источник данных, который возвращает все сущности. Я хотел бы иметь возможность возвращать итератор из метода, который внутренне распараллеливает запрос к кешу и источнику данных, чтобы кэшированные результаты давали как можно быстрее.
Примечание 1: я понимаю, что это все еще тратит впустую циклы процессора; Я не спрашиваю, как я могу предотвратить итерации последовательностей по своим медленным элементам, просто как я могу объединить их как можно быстрее.
Обновление 1: я настроил отличный ответ от Ачитаки-сан на прием нескольких производителей и использование ContinueWhenAll для установки CompleteAdding BlockingCollection только один раз. Я просто поместил это здесь, так как это потеряло бы в отсутствии форматирования комментариев. Любые дальнейшие отзывы будут отличными!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}
2 ответа
Взгляните на это. Первый метод просто возвращает все в порядке поступления результатов. Второй проверяет уникальность. Думаю, если вы их зацепите, вы получите желаемый результат.
public static class Class1
{
public static IEnumerable<TResult> SelectAsync<TResult>(
IEnumerable<TResult> producer1,
IEnumerable<TResult> producer2,
int capacity)
{
var resultsQueue = new BlockingCollection<TResult>(capacity);
var producer1Done = false;
var producer2Done = false;
Task.Factory.StartNew(() =>
{
foreach (var product in producer1)
{
resultsQueue.Add(product);
}
producer1Done = true;
if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
});
Task.Factory.StartNew(() =>
{
foreach (var product in producer2)
{
resultsQueue.Add(product);
}
producer2Done = true;
if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
});
return resultsQueue.GetConsumingEnumerable();
}
public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
{
HashSet<TResult> knownResults = new HashSet<TResult>();
foreach (TResult result in source)
{
if (knownResults.Contains(result)) {continue;}
knownResults.Add(result);
yield return result;
}
}
}
Кэш будет почти мгновенным по сравнению с извлечением из базы данных, поэтому вы можете сначала прочитать из кэша и вернуть эти элементы, затем прочитать из базы данных и вернуть элементы, кроме тех, которые были найдены в кэше.
Если вы попытаетесь распараллелить это, вы добавите много сложностей, но получите довольно небольшой выигрыш.
Редактировать:
Если нет предсказуемой разницы в скорости источников, вы можете запустить их в потоках и использовать синхронизированный хэш-набор для отслеживания того, какие элементы вы уже получили, поместить новые элементы в очередь и позволить основному потоку прочитать из очереди:
public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
HashSet<TKey> found = new HashSet<TKey>();
List<TItem> queue = new List<TItem>();
object sync = new object();
int alive = 0;
object aliveSync = new object();
foreach (IEnumerable<TItem> source in sources) {
lock (aliveSync) {
alive++;
}
new Thread(s => {
foreach (TItem item in s as IEnumerable<TItem>) {
TKey key = getKey(item);
lock (sync) {
if (found.Add(key)) {
queue.Add(item);
}
}
}
lock (aliveSync) {
alive--;
}
}).Start(source);
}
while (true) {
lock (sync) {
if (queue.Count > 0) {
foreach (TItem item in queue) {
yield return item;
}
queue.Clear();
}
}
lock (aliveSync) {
if (alive == 0) break;
}
Thread.Sleep(100);
}
}
Тестовый поток:
public static IEnumerable<int> SlowRandomFeed(Random rnd) {
int[] values = new int[100];
for (int i = 0; i < 100; i++) {
int pos = rnd.Next(i + 1);
values[i] = i;
int temp = values[pos];
values[pos] = values[i];
values[i] = temp;
}
foreach (int value in values) {
yield return value;
Thread.Sleep(rnd.Next(200));
}
}
Тестовое задание:
Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
Console.Write("{0:0000 }", item);
}