Параллельный HashSet<T> в.NET Framework?
У меня есть следующий класс.
class Test{
public HashSet<string> Data = new HashSet<string>();
}
Мне нужно изменить поле "Данные" из разных потоков, поэтому я хотел бы высказать некоторые мнения о моей текущей поточно-безопасной реализации.
class Test{
public HashSet<string> Data = new HashSet<string>();
public void Add(string Val){
lock(Data) Data.Add(Val);
}
public void Remove(string Val){
lock(Data) Data.Remove(Val);
}
}
Есть ли лучшее решение, чтобы перейти непосредственно к полю и защитить его от одновременного доступа несколькими потоками?
8 ответов
Ваша реализация верна. К сожалению,.NET Framework не предоставляет встроенный тип одновременного хэш-набора. Тем не менее, есть некоторые обходные пути.
ConcurrentDictionary (рекомендуется)
Этот первый должен использовать класс ConcurrentDictionary<TKey, TValue>
в пространстве имен System.Collections.Concurrent
, В этом случае значение не имеет смысла, поэтому мы можем использовать простой byte
(1 байт в памяти).
private ConcurrentDictionary<string, byte> _data;
Это рекомендуемый вариант, потому что тип является потокобезопасным и предоставляет вам те же преимущества, что и HashSet<T>
кроме ключа и значения разные объекты.
Источник: Социальный MSDN
ConcurrentBag
Если вы не против повторяющихся записей, вы можете использовать класс ConcurrentBag<T>
в том же пространстве имен предыдущего класса.
private ConcurrentBag<string> _data;
Само-реализация
Наконец, как и вы, вы можете реализовать свой собственный тип данных, используя блокировку или другие способы, которые.NET предоставляет вам для обеспечения безопасности потоков. Вот отличный пример: Как реализовать ConcurrentHashSet в.Net
Единственным недостатком этого решения является то, что тип HashSet<T>
официально не одновременный доступ, даже для операций чтения.
Я цитирую код связанного поста (первоначально написанный Ben Mosher).
using System.Collections.Generic;
using System.Threading;
namespace BlahBlah.Utilities
{
public class ConcurrentHashSet<T> : IDisposable
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
#region Implementation of ICollection<T> ...ish
public bool Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _hashSet.Count;
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
}
#endregion
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
~ConcurrentHashSet()
{
Dispose(false);
}
#endregion
}
}
РЕДАКТИРОВАТЬ: переместить методы блокировки входа в try
блоки, поскольку они могут генерировать исключение и выполнять инструкции, содержащиеся в finally
блоки.
Вместо того, чтобы обернуть ConcurrentDictionary
или запирание HashSet
Я создал реальный ConcurrentHashSet
основанный на ConcurrentDictionary
,
Эта реализация поддерживает основные операции для каждого элемента без HashSet
Операции с наборами, поскольку они имеют меньше смысла в параллельных сценариях IMO:
var concurrentHashSet = new ConcurrentHashSet<string>(
new[]
{
"hamster",
"HAMster",
"bar",
},
StringComparer.OrdinalIgnoreCase);
concurrentHashSet.TryRemove("foo");
if (concurrentHashSet.Contains("BAR"))
{
Console.WriteLine(concurrentHashSet.Count);
}
Выход: 2
Вы можете получить его от NuGet здесь и посмотреть источник на GitHub здесь.
Поскольку никто другой не упомянул об этом, я предложу альтернативный подход, который может подходить или не подходить для вашей конкретной цели:
Неизменные коллекции Microsoft
Из сообщения в блоге команды MS позади:
Хотя создание и запуск одновременно проще, чем когда-либо, все еще существует одна из фундаментальных проблем: изменяемое общее состояние. Чтение из нескольких потоков, как правило, очень просто, но после обновления состояния становится намного сложнее, особенно в проектах, требующих блокировки.
Альтернативой блокировке является использование неизменного состояния. Неизменяемые структуры данных гарантированно никогда не изменятся и, таким образом, могут свободно передаваться между различными потоками, не беспокоясь о наступлении на чужие пальцы.
Этот дизайн создает новую проблему: как управлять изменениями состояния, не копируя каждый раз все состояние? Это особенно сложно, когда участвуют коллекции.
Это где неизменные коллекции входят.
Эти коллекции включают ImmutableHashSet
Спектакль
Поскольку неизменяемые коллекции используют древовидные структуры данных для обеспечения структурного совместного использования, их характеристики производительности отличаются от изменяемых коллекций. При сравнении с изменяемой блокировкой коллекции результаты будут зависеть от конкуренции за блокировку и схем доступа. Однако взято из другого поста блога об неизменных коллекциях:
Q: Я слышал, что неизменные коллекции медленные. Это что-то другое? Могу ли я использовать их, когда важны производительность или память?
О: Эти неизменяемые коллекции были настроены так, чтобы иметь конкурентоспособные характеристики производительности с изменяемыми коллекциями при одновременном распределении памяти. В некоторых случаях они почти так же быстры, как изменяемые коллекции, как алгоритмически, так и в реальном времени, иногда даже быстрее, в то время как в других случаях они алгоритмически более сложны. Однако во многих случаях разница будет незначительной. Как правило, вы должны использовать самый простой код для выполнения работы, а затем настраивать производительность по мере необходимости. Неизменяемые коллекции помогают вам писать простой код, особенно когда нужно учитывать безопасность потоков.
Другими словами, во многих случаях разница не будет заметна, и вы должны пойти на более простой выбор - который для одновременных наборов будет использовать ImmutableHashSet<T>
, так как у вас нет существующей блокировки изменяемой реализации!:-)
Хитрая часть о создании ISet<T>
одновременным является то, что заданные методы (объединение, пересечение, разность) имеют итеративный характер. По крайней мере, вам нужно перебрать все n членов одного из наборов, участвующих в операции, при этом блокируя оба набора.
Вы теряете преимущества ConcurrentDictionary<T,byte>
когда вы должны заблокировать весь набор во время итерации. Без блокировки эти операции не являются потокобезопасными.
Учитывая дополнительные накладные расходы ConcurrentDictionary<T,byte>
, вероятно, разумнее использовать более легкий вес HashSet<T>
и просто окружить все замками.
Если вам не нужны заданные операции, используйте ConcurrentDictionary<T,byte>
и просто использовать default(byte)
в качестве значения при добавлении ключей.
Решения, основанные наConcurrentDictionary<TKey, TValue>
обычно хорошо масштабируются; Однако, если вам нужно получить доступ кCount
,Keys
илиValues
свойства или вы перебираете элементы, это становится хуже, чем одна блокирующая коллекция. Это связано с тем, что используется группа блокировок (по умолчанию их количество зависит от количества ядер ЦП), и доступ к этим членам приводит к получению всех блокировок, поэтому они имеют тем хуже производительность, чем больше ядер у вашего ЦП.
Другой ответ предлагает использовать неизменяемые коллекции. Хотя они потокобезопасны, они работают хорошо только в том случае, если вы редко добавляете к ним новые элементы (что всегда создает новый экземпляр, хотя и пытается наследовать как можно больше узлов от предыдущего экземпляра), но даже в этом случае они обычно имеют более низкая производительность.
В итоге я нашел (которое я также применил к своемупозже): в отличие отConcurrentDictionary
, я использую только одну блокировку, но только временно: время от времени новинки перемещаются в полностью безблокировочное хранилище, где даже удаление и повторное добавление одних и тех же ключей становятся безблокировочными, следовательно, очень быстрыми. Никакие таймеры не используются для выполнения этих слияний. Слияние с хранилищем без блокировки происходит только тогда, когда к хранилищу с блокировкой необходимо получить доступ, и оно было создано «достаточно давно» назад, что настраивается .
Примечание. См. сравнительную таблицу производительности в разделе «Примечания»другое решение
ThreadSafeDictionary<TKey TValue>
класса (имеет те же характеристики, что иThreadSafeHashSet<T>
), чтобы убедиться, что это хороший выбор для ваших нужд. Здесь вы можете найти исходный код тестов производительности, если хотите выполнить их самостоятельно.
Исходники доступны здесь , а также вы можете скачать их в виде пакета NuGet .
Я предпочитаю полные решения, поэтому я сделал это: имейте в виду, что мой счетчик реализован по-другому, потому что я не понимаю, почему нужно запретить читать хэш-набор при попытке подсчитать его значения.
@ Zen, спасибо, что начали.
[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
public ConcurrentHashSet()
{
}
public ConcurrentHashSet(IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(comparer);
}
public ConcurrentHashSet(IEnumerable<T> collection)
{
_hashSet = new HashSet<T>(collection);
}
public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(collection, comparer);
}
protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
{
_hashSet = new HashSet<T>();
// not sure about this one really...
var iSerializable = _hashSet as ISerializable;
iSerializable.GetObjectData(info, context);
}
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
public IEnumerator<T> GetEnumerator()
{
return _hashSet.GetEnumerator();
}
~ConcurrentHashSet()
{
Dispose(false);
}
public void OnDeserialization(object sender)
{
_hashSet.OnDeserialization(sender);
}
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
_hashSet.GetObjectData(info, context);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
#endregion
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_hashSet.Add(item);
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void UnionWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.UnionWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void IntersectWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.IntersectWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void ExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.ExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void SymmetricExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
_hashSet.SymmetricExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Overlaps(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Overlaps(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool SetEquals(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.SetEquals(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
bool ISet<T>.Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterWriteLock();
try
{
_hashSet.CopyTo(array, arrayIndex);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterWriteLock();
try
{
return _hashSet.Count;
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
}
Я обнаружил, что ни простой блокировки методов добавления и удаления System.Collections.Generic.HashSet, ни упаковки ConcurrentDictionary фреймворка недостаточно в сценариях с «высокой пропускной способностью», требующих хорошей производительности.
Достаточно хорошей производительности уже можно достичь, используя эту простую идею:
public class ExampleHashSet<T>
{
const int ConcurrencyLevel = 124;
const int Lower31BitMask = 0x7FFFFFFF;
HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
IEqualityComparer<T> comparer;
public ExampleHashSet()
{
comparer = EqualityComparer<T>.Default;
for(int i = 0; i < ConcurrencyLevel; i++)
sets[i] = new HashSet<T>();
}
public bool Add(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
lock(sets[hash])
{
return sets[hash].Add(item);
}
}
public bool Remove(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
lock(sets[hash])
{
return sets[hash].Remove(item);
}
}
// further methods ...
}
Системный HashSet обернут, но, в отличие от других ответов, мы удерживаем блокировки на нескольких HashSet. Различные потоки могут «работать» с разными HashSet, уменьшая общее время ожидания.
Эту идею можно обобщить и реализовать непосредственно в самом HashSet(удерживание блокировок на бакетах вместо блокирования полных наборов). Пример можно найти здесь .
Для наиболее распространенных задач должно быть достаточно следующего производного класса. Чтобы получить полноценный хеш-набор, вы все равно можете использоватьConcurrentDictionary
в качестве вспомогательного поля реализуйте все интерфейсы и передавайте вызовы в словарь.
Простая реализация
public class ConcurrentHashSet<T> : ConcurrentDictionary<T, byte>
where T : notnull
{
const byte DummyByte = byte.MinValue;
// For convenience, we add HashSet equivalent APIs here...
public bool Contains(T item) => ContainsKey(item);
public bool Add(T item) => TryAdd(item, DummyByte);
public bool Remove(T item) => TryRemove(item, out _);
}
Ограничения
- Нет поддержки для
null
ценности. - Будет вести себя как словарь. Например, перечисление, сериализация...
-
HashSet
определенные интерфейсы отсутствуют.