ConcurrentDictionary GetOrAdd async
Я хочу использовать что-то вроде GetOrAdd
с ConcurrentDictionary
в качестве кэша для веб-службы. Есть ли асинхронная версия этого словаря? GetOrAdd будет делать веб-запрос, используя HttpClient
, поэтому было бы неплохо, если бы была версия этого словаря, в которой GetOrAdd был асинхронным.
Чтобы устранить некоторую путаницу, содержимое словаря будет ответом на вызов веб-службы.
ConcurrentDictionary<string, Response> _cache = new ConcurrentDictionary<string, Response>();
var response = _cache.GetOrAdd("id", (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse();} )
5 ответов
GetOrAdd
не станет асинхронной операцией, потому что доступ к значению словаря не является длительной операцией.
Однако вы можете просто хранить задачи в словаре, а не материализованный результат. Любой, кому нужны результаты, может дождаться этой задачи.
Однако вам также необходимо убедиться, что операция запускается только один раз, а не несколько раз. Чтобы некоторые операции выполнялись только один раз, а не несколько раз, необходимо также добавить Lazy
:
ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();
var response = await _cache.GetOrAdd("id", url => new Lazy<Response>(_httpClient.GetAsync(x))).Value;
GetOrAdd
метод не так велик, чтобы использовать для этой цели. Так как он не гарантирует, что фабрика запускается только один раз, единственная цель, которую он имеет, - это небольшая оптимизация (незначительная, поскольку добавления в любом случае редки) в том, что ей не нужно хешировать и дважды находить правильный сегмент (что может произойти дважды, если Вы получаете и устанавливаете с двумя отдельными вызовами).
Я бы посоветовал сначала проверить кеш, если вы не нашли значение в кеше, а затем ввести некую форму критического раздела (блокировка, семафор и т. Д.), Перепроверить кеш, если он все еще отсутствует, затем извлечь значение и вставьте в кеш.
Это гарантирует, что ваш запасной магазин будет поражен только один раз; даже если несколько запросов одновременно пропускают кэш, только первый запрос получит значение, остальные будут ожидать семафор, а затем рано возвращаться, так как они повторно проверяют кэш в критическом разделе.
Код Psuedo (с использованием SemaphoreSlim со счетчиком 1, поскольку вы можете ожидать его асинхронно):
async Task<TResult> GetAsync(TKey key)
{
// Try to fetch from catch
if (cache.TryGetValue(key, out var result)) return result;
// Get some resource lock here, for example use SemaphoreSlim
// which has async wait function:
await semaphore.WaitAsync();
try
{
// Try to fetch from cache again now that we have entered
// the critical section
if (cache.TryGetValue(key, out result)) return result;
// Fetch data from source (using your HttpClient or whatever),
// update your cache and return.
return cache[key] = await FetchFromSourceAsync(...);
}
finally
{
semaphore.Release();
}
}
Попробуйте этот метод расширения:
/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
this ConcurrentDictionary<TKey,TResult> dict,
TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
if (dict.TryGetValue(key, out TResult resultingValue))
{
return resultingValue;
}
var newValue = await asyncValueFactory(key);
return dict.GetOrAdd(key, newValue);
}
Вместо
dict.GetOrAdd(key,key=>something(key))
, ты используешь
await dict.GetOrAddAsync(key,async key=>await something(key))
. Очевидно, в этой ситуации вы просто пишете это как
await dict.GetOrAddAsync(key,something)
, но я хотел уточнить.
Что касается опасений по поводу сохранения порядка операций, у меня есть следующие замечания:
- Использование обычного GetOrAdd даст тот же эффект, если вы посмотрите на то, как он реализован. Я буквально использовал тот же код и заставил его работать для асинхронности. Ссылка говорит
делегат valueFactory вызывается за пределами блокировки, чтобы избежать проблем, которые могут возникнуть при выполнении неизвестного кода под блокировкой. Следовательно, GetOrAdd не является атомарным по отношению ко всем другим операциям в классе ConcurrentDictionary<TKey,TValue>.
- SyncRoot не поддерживается в ConcurrentDictionary, они используют внутренний механизм блокировки, поэтому блокировка на нем невозможна. Однако использование собственного механизма блокировки работает только для этого метода расширения. Если вы используете другой поток (например, с помощью GetOrAdd), вы столкнетесь с той же проблемой.
Вероятно, использование выделенного кеша памяти (например, или старых классов или этой сторонней библиотеки) должно быть предпочтительнее использования простого. Если вам действительно не нужны часто используемые функции, такие как истечение срока действия на основе времени, сжатие на основе размера, автоматическое удаление записей, которые зависят от других записей, срок действия которых истек, или зависит от изменяемых внешних ресурсов (таких как файлы, базы данных и т. д.). При этом следует отметить, чтоновыхMemoryCache
все еще может потребоваться некоторая работа для правильной обработки асинхронных делегатов, поскольку его стандартное поведение не идеально .
Ниже приведен пользовательский метод расширения
GetOrAddAsync
за
ConcurrentDictionary
у которых есть
Task<TValue>
ценности. Он принимает фабричный метод и гарантирует, что метод будет вызываться не более одного раза. Это также гарантирует, что неудачные задачи будут удалены из словаря. Эта реализация оптимизирована для случая, когда получение существующей задачи происходит часто, а создание новой — редко.
/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
Func<TKey, Task<TValue>> valueFactory)
{
if (!source.TryGetValue(key, out var currentTask))
{
Task<TValue> newTask = null;
var newTaskTask = new Task<Task<TValue>>(async () =>
{
try { return await valueFactory(key).ConfigureAwait(false); }
catch
{
((ICollection<KeyValuePair<TKey, Task<TValue>>>)source)
.Remove(new KeyValuePair<TKey, Task<TValue>>(key, newTask));
//source.TryRemove(KeyValuePair.Create(key, newTask)); // .NET 5
throw;
}
});
newTask = newTaskTask.Unwrap();
currentTask = source.GetOrAdd(key, newTask);
if (currentTask == newTask)
newTaskTask.RunSynchronously(TaskScheduler.Default);
}
return currentTask;
}
Пример использования:
var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();
var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
return await _httpClient.GetAsync(url);
});
Для удаления сбойных задач эта реализация использует явно реализованныйICollection<T>.Remove
API. Дополнительную информацию об этом API можно найти здесь . Начиная с .NET 5, новыйTryRemove(KeyValuePair<TKey, TValue> item)
Вместо этого можно использовать метод.
Кстати, если требуется высочайшая производительность, вы можете взглянуть на стороннюю библиотеку BitFaster.Caching . Я никогда не использовал его лично, но диаграммы с его тестами выглядят впечатляюще.
Я решил это несколько лет назад, и родилась TPL. Я сижу в кафе и у меня нет исходного кода, но получилось примерно так.
Это не строгий ответ, но он может вдохновить вас на собственное решение. Важно вернуть значение, которое было только что добавлено или уже существует, вместе с логическим значением, чтобы вы могли разветвить выполнение.
Дизайн позволяет легко разделить логику победы в гонке и логику проигрыша.
public bool TryAddValue(TKey key, TValue value, out TValue contains)
{
// guards etc.
while (true)
{
if (this.concurrentDic.TryAdd(key, value))
{
contains = value;
return true;
}
else if (this.concurrentDic.TryGetValue(key, out var existing))
{
contains = existing;
return false;
}
else
{
// Slipped down the rare path. The value was removed between the
// above checks. I think just keep trying because we must have
// been really unlucky.
// Note this spinning will cause adds to execute out of
// order since a very unlucky add on a fast moving collection
// could in theory be bumped again and again before getting
// lucky and getting its value added, or locating existing.
// A tiny random sleep might work. Experiment under load.
}
}
}
Это может быть сделано в виде расширения для
ConcurrentDictionary
или быть методом сам по себе, вашим собственным кешем или чем-то, использующим блокировки.
Возможно,
GetOrAdd(K,V)
может использоваться с
Object.ReferenceEquals()
чтобы проверить, добавлено оно или нет, вместо спин-дизайна.
Честно говоря, приведенный выше код не является целью моего ответа. Сила заключается в простом дизайне сигнатуры метода и в том, как она обеспечивает следующее:
static readonly ConcurrentDictionary<string, Task<Task<Thing>>> tasks = new();
//
var newTask = new Task<Task<Thing>>(() => GetThingAsync(thingId));
if (this.tasks.TryAddValue(thingId, newTask, out var task))
{
task.Start();
}
var thingTask = await task;
var thing = await thingTask;
Немного странно, как нужно хранить a (если ваша работа асинхронна), и нужно учитывать распределение неиспользуемых s.
Я думаю, что это позор, Microsoft не отправила свою потокобезопасную коллекцию с этим методом или не извлекла интерфейс «параллельной коллекции».
Моей реальной реализацией был кеш со сложными истекающими внутренними коллекциями и прочим. Я думаю, вы могли бы создать подкласс .NET
Task
класс и добавить
CreatedAt
имущество для оказания помощи при выселении.
Отказ от ответственности Я вообще не пробовал это, это не в голове, но я использовал такой дизайн в сверхвысокопроизводительном приложении в 2009 году.