Параллельный HashSet<T> в.NET Framework?

У меня есть следующий класс.

class Test{
    public HashSet<string> Data = new HashSet<string>();
}

Мне нужно изменить поле "Данные" из разных потоков, поэтому я хотел бы высказать некоторые мнения о моей текущей поточно-безопасной реализации.

class Test{
    public HashSet<string> Data = new HashSet<string>();

    public void Add(string Val){
            lock(Data) Data.Add(Val);
    }

    public void Remove(string Val){
            lock(Data) Data.Remove(Val);
    }
}

Есть ли лучшее решение, чтобы перейти непосредственно к полю и защитить его от одновременного доступа несколькими потоками?

8 ответов

Решение

Ваша реализация верна. К сожалению,.NET Framework не предоставляет встроенный тип одновременного хэш-набора. Тем не менее, есть некоторые обходные пути.

ConcurrentDictionary (рекомендуется)

Этот первый должен использовать класс ConcurrentDictionary<TKey, TValue> в пространстве имен System.Collections.Concurrent, В этом случае значение не имеет смысла, поэтому мы можем использовать простой byte (1 байт в памяти).

private ConcurrentDictionary<string, byte> _data;

Это рекомендуемый вариант, потому что тип является потокобезопасным и предоставляет вам те же преимущества, что и HashSet<T> кроме ключа и значения разные объекты.

Источник: Социальный MSDN

ConcurrentBag

Если вы не против повторяющихся записей, вы можете использовать класс ConcurrentBag<T> в том же пространстве имен предыдущего класса.

private ConcurrentBag<string> _data;

Само-реализация

Наконец, как и вы, вы можете реализовать свой собственный тип данных, используя блокировку или другие способы, которые.NET предоставляет вам для обеспечения безопасности потоков. Вот отличный пример: Как реализовать ConcurrentHashSet в.Net

Единственным недостатком этого решения является то, что тип HashSet<T> официально не одновременный доступ, даже для операций чтения.

Я цитирую код связанного поста (первоначально написанный Ben Mosher).

using System.Collections.Generic;
using System.Threading;

namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();

        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public void Clear()
        {
            _lock.EnterWriteLock();
            try
            {
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public bool Contains(T item)
        {
            _lock.EnterReadLock();
            try
            {
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }

        public bool Remove(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public int Count
        {
            get
            {
                _lock.EnterReadLock();
                try
                {
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion

        #region Dispose
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
                if (_lock != null)
                    _lock.Dispose();
        }
        ~ConcurrentHashSet()
        {
            Dispose(false);
        }
        #endregion
    }
}

РЕДАКТИРОВАТЬ: переместить методы блокировки входа в try блоки, поскольку они могут генерировать исключение и выполнять инструкции, содержащиеся в finally блоки.

Вместо того, чтобы обернуть ConcurrentDictionary или запирание HashSet Я создал реальный ConcurrentHashSet основанный на ConcurrentDictionary,

Эта реализация поддерживает основные операции для каждого элемента без HashSetОперации с наборами, поскольку они имеют меньше смысла в параллельных сценариях IMO:

var concurrentHashSet = new ConcurrentHashSet<string>(
    new[]
    {
        "hamster",
        "HAMster",
        "bar",
    },
    StringComparer.OrdinalIgnoreCase);

concurrentHashSet.TryRemove("foo");

if (concurrentHashSet.Contains("BAR"))
{
    Console.WriteLine(concurrentHashSet.Count);
}

Выход: 2

Вы можете получить его от NuGet здесь и посмотреть источник на GitHub здесь.

Поскольку никто другой не упомянул об этом, я предложу альтернативный подход, который может подходить или не подходить для вашей конкретной цели:

Неизменные коллекции Microsoft

Из сообщения в блоге команды MS позади:

Хотя создание и запуск одновременно проще, чем когда-либо, все еще существует одна из фундаментальных проблем: изменяемое общее состояние. Чтение из нескольких потоков, как правило, очень просто, но после обновления состояния становится намного сложнее, особенно в проектах, требующих блокировки.

Альтернативой блокировке является использование неизменного состояния. Неизменяемые структуры данных гарантированно никогда не изменятся и, таким образом, могут свободно передаваться между различными потоками, не беспокоясь о наступлении на чужие пальцы.

Этот дизайн создает новую проблему: как управлять изменениями состояния, не копируя каждый раз все состояние? Это особенно сложно, когда участвуют коллекции.

Это где неизменные коллекции входят.

Эти коллекции включают ImmutableHashSet ; и ImmutableList ;.

Спектакль

Поскольку неизменяемые коллекции используют древовидные структуры данных для обеспечения структурного совместного использования, их характеристики производительности отличаются от изменяемых коллекций. При сравнении с изменяемой блокировкой коллекции результаты будут зависеть от конкуренции за блокировку и схем доступа. Однако взято из другого поста блога об неизменных коллекциях:

Q: Я слышал, что неизменные коллекции медленные. Это что-то другое? Могу ли я использовать их, когда важны производительность или память?

О: Эти неизменяемые коллекции были настроены так, чтобы иметь конкурентоспособные характеристики производительности с изменяемыми коллекциями при одновременном распределении памяти. В некоторых случаях они почти так же быстры, как изменяемые коллекции, как алгоритмически, так и в реальном времени, иногда даже быстрее, в то время как в других случаях они алгоритмически более сложны. Однако во многих случаях разница будет незначительной. Как правило, вы должны использовать самый простой код для выполнения работы, а затем настраивать производительность по мере необходимости. Неизменяемые коллекции помогают вам писать простой код, особенно когда нужно учитывать безопасность потоков.

Другими словами, во многих случаях разница не будет заметна, и вы должны пойти на более простой выбор - который для одновременных наборов будет использовать ImmutableHashSet<T>, так как у вас нет существующей блокировки изменяемой реализации!:-)

Хитрая часть о создании ISet<T> одновременным является то, что заданные методы (объединение, пересечение, разность) имеют итеративный характер. По крайней мере, вам нужно перебрать все n членов одного из наборов, участвующих в операции, при этом блокируя оба набора.

Вы теряете преимущества ConcurrentDictionary<T,byte> когда вы должны заблокировать весь набор во время итерации. Без блокировки эти операции не являются потокобезопасными.

Учитывая дополнительные накладные расходы ConcurrentDictionary<T,byte>, вероятно, разумнее использовать более легкий вес HashSet<T> и просто окружить все замками.

Если вам не нужны заданные операции, используйте ConcurrentDictionary<T,byte> и просто использовать default(byte) в качестве значения при добавлении ключей.

Решения, основанные наConcurrentDictionary<TKey, TValue>обычно хорошо масштабируются; Однако, если вам нужно получить доступ кCount,KeysилиValuesсвойства или вы перебираете элементы, это становится хуже, чем одна блокирующая коллекция. Это связано с тем, что используется группа блокировок (по умолчанию их количество зависит от количества ядер ЦП), и доступ к этим членам приводит к получению всех блокировок, поэтому они имеют тем хуже производительность, чем больше ядер у вашего ЦП.

Другой ответ предлагает использовать неизменяемые коллекции. Хотя они потокобезопасны, они работают хорошо только в том случае, если вы редко добавляете к ним новые элементы (что всегда создает новый экземпляр, хотя и пытается наследовать как можно больше узлов от предыдущего экземпляра), но даже в этом случае они обычно имеют более низкая производительность.

В итоге я нашел (которое я также применил к своемупозже): в отличие отConcurrentDictionary, я использую только одну блокировку, но только временно: время от времени новинки перемещаются в полностью безблокировочное хранилище, где даже удаление и повторное добавление одних и тех же ключей становятся безблокировочными, следовательно, очень быстрыми. Никакие таймеры не используются для выполнения этих слияний. Слияние с хранилищем без блокировки происходит только тогда, когда к хранилищу с блокировкой необходимо получить доступ, и оно было создано «достаточно давно» назад, что настраивается .

Примечание. См. сравнительную таблицу производительности в разделе «Примечания»другое решениеThreadSafeDictionary<TKey TValue>класса (имеет те же характеристики, что иThreadSafeHashSet<T>), чтобы убедиться, что это хороший выбор для ваших нужд. Здесь вы можете найти исходный код тестов производительности, если хотите выполнить их самостоятельно.

Исходники доступны здесь , а также вы можете скачать их в виде пакета NuGet .

Я предпочитаю полные решения, поэтому я сделал это: имейте в виду, что мой счетчик реализован по-другому, потому что я не понимаю, почему нужно запретить читать хэш-набор при попытке подсчитать его значения.

@ Zen, спасибо, что начали.

[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
    private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

    private readonly HashSet<T> _hashSet = new HashSet<T>();

    public ConcurrentHashSet()
    {
    }

    public ConcurrentHashSet(IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(comparer);
    }

    public ConcurrentHashSet(IEnumerable<T> collection)
    {
        _hashSet = new HashSet<T>(collection);
    }

    public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(collection, comparer);
    }

    protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
    {
        _hashSet = new HashSet<T>();

        // not sure about this one really...
        var iSerializable = _hashSet as ISerializable;
        iSerializable.GetObjectData(info, context);
    }

    #region Dispose

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
            if (_lock != null)
                _lock.Dispose();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _hashSet.GetEnumerator();
    }

    ~ConcurrentHashSet()
    {
        Dispose(false);
    }

    public void OnDeserialization(object sender)
    {
        _hashSet.OnDeserialization(sender);
    }

    public void GetObjectData(SerializationInfo info, StreamingContext context)
    {
        _hashSet.GetObjectData(info, context);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    #endregion

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Add(item);
        }
        finally
        {
            if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void UnionWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.UnionWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void IntersectWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.IntersectWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void ExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.ExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.SymmetricExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Overlaps(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Overlaps(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool SetEquals(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.SetEquals(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    bool ISet<T>.Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Add(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Clear();
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Contains(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.CopyTo(array, arrayIndex);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Remove(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public int Count
    {
        get
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Count;
            }
            finally
            {
                if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }

        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
}

Я обнаружил, что ни простой блокировки методов добавления и удаления System.Collections.Generic.HashSet, ни упаковки ConcurrentDictionary фреймворка недостаточно в сценариях с «высокой пропускной способностью», требующих хорошей производительности.

Достаточно хорошей производительности уже можно достичь, используя эту простую идею:

      public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;

    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;

        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }

    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }

    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }

    // further methods ...
}

Системный HashSet обернут, но, в отличие от других ответов, мы удерживаем блокировки на нескольких HashSet. Различные потоки могут «работать» с разными HashSet, уменьшая общее время ожидания.

Эту идею можно обобщить и реализовать непосредственно в самом HashSet(удерживание блокировок на бакетах вместо блокирования полных наборов). Пример можно найти здесь .

Для наиболее распространенных задач должно быть достаточно следующего производного класса. Чтобы получить полноценный хеш-набор, вы все равно можете использоватьConcurrentDictionaryв качестве вспомогательного поля реализуйте все интерфейсы и передавайте вызовы в словарь.

Простая реализация

      public class ConcurrentHashSet<T> : ConcurrentDictionary<T, byte>
    where T : notnull
{
    const byte DummyByte = byte.MinValue;

    // For convenience, we add HashSet equivalent APIs here...
    public bool Contains(T item) => ContainsKey(item);
    public bool Add(T item) => TryAdd(item, DummyByte);
    public bool Remove(T item) => TryRemove(item, out _);
}

Ограничения

  1. Нет поддержки дляnullценности.
  2. Будет вести себя как словарь. Например, перечисление, сериализация...
  3. HashSetопределенные интерфейсы отсутствуют.
Другие вопросы по тегам