Нет ConcurrentList<T> в.Net 4.0?
Я был взволнован, чтобы увидеть новый System.Collections.Concurrent
Пространство имен в.Net 4.0, довольно приятно! я видел ConcurrentDictionary
, ConcurrentQueue
, ConcurrentStack
, ConcurrentBag
а также BlockingCollection
,
Одна вещь, которая, кажется, таинственно отсутствует, это ConcurrentList<T>
, Должен ли я сам написать это (или убрать это из Интернета:))?
Я что-то упускаю здесь очевидное?
12 ответов
Я дал ему попытку некоторое время назад (также: на GitHub). В моей реализации были некоторые проблемы, о которых я не буду здесь говорить. Позвольте мне рассказать вам, что более важно, что я узнал.
Во-первых, вы никак не сможете получить полную реализацию IList<T>
это без блокировки и потокобезопасно. В частности, случайные вставки и удаления не будут работать, если вы также не забудете о O(1) произвольном доступе (то есть, если вы не "обманываете" и просто используете какой-то связанный список и не позволяете индексации сосать).
То, что я думал, могло бы быть полезным, было потокобезопасным, ограниченным подмножеством IList<T>
в частности тот, который позволил бы Add
и обеспечить произвольный доступ только для чтения по индексу (но нет Insert
, RemoveAt
и т. д., а также нет произвольного доступа для записи).
Это была цель моего ConcurrentList<T>
реализация. Но когда я проверил его производительность в многопоточных сценариях, я обнаружил, что простая синхронизация добавляет к List<T>
было быстрее. В основном, добавление к List<T>
молниеносно уже; сложность вычислительных шагов минимальна (увеличить индекс и присвоить элементу в массиве; это действительно так). Вам понадобится куча одновременных записей, чтобы увидеть какие-либо конфликты блокировок по этому вопросу; и даже тогда средняя производительность каждой записи все равно побьет более дорогую, хотя и без блокировки, реализацию в ConcurrentList<T>
,
В относительно редком случае, когда внутренний массив списка должен изменить свой размер, вы платите небольшую цену. В конечном итоге я пришел к выводу, что это был единственный нишевый сценарий, когда надстройка ConcurrentList<T>
Тип коллекции имеет смысл: когда вы хотите гарантированно низкие накладные расходы на добавление элемента при каждом отдельном вызове (в отличие от амортизированной цели производительности).
Это просто не такой полезный класс, как вы думаете.
Для чего бы вы использовали ConcurrentList?
Концепция контейнера произвольного доступа в многопоточном мире не так полезна, как может показаться. Заявление
if (i < MyConcurrentList.Count)
x = MyConcurrentList[i];
в целом все равно не будет потокобезопасным.
Вместо того, чтобы создавать ConcurrentList, попробуйте создать решения с тем, что там. Наиболее распространенными классами являются ConcurrentBag и особенно BlockingCollection.
При всем уважении к уже полученным отличным ответам, бывают моменты, когда я просто хочу многопоточный IList. Ничего сложного или необычного. Производительность важна во многих случаях, но иногда это не является проблемой. Да, всегда будут проблемы без таких методов, как "TryGetValue" и т. Д., Но в большинстве случаев я просто хочу что-то, что я могу перечислить, не беспокоясь о том, чтобы наложить блокировки вокруг всего. И да, кто-то может найти какую-то "ошибку" в моей реализации, которая может привести к тупику или чему-то еще (я полагаю), но давайте будем честными: если речь идет о многопоточности, если вы не пишете свой код правильно, это в любом случае заходит в тупик. Имея это в виду, я решил сделать простую реализацию ConcurrentList, которая обеспечивает эти основные потребности.
И для чего это стоит: я сделал базовый тест по добавлению 10000000 элементов в обычный List и ConcurrentList, и результаты были следующими:
Список закончен: 7793 миллисекунды. Параллельное завершение: 8064 миллисекунды.
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
Причина, по которой нет ConcurrentList, заключается в том, что он принципиально не может быть записан. Причина в том, что некоторые важные операции в IList основаны на индексах, и это просто не сработает. Например:
int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");
Эффект, к которому стремится автор, заключается в вставке слова "собака" перед словом "кошка", но в многопоточной среде между этими двумя строками кода может произойти что угодно. Например, другой поток может сделать list.RemoveAt(0)
, сдвигая весь список влево, но главное, catIndex не изменится. Влияние здесь заключается в том, чтоInsert
Операция фактически поставит "собаку" за кошкой, а не перед ней.
Несколько реализаций, которые вы видите как "ответы" на этот вопрос, имеют хорошие значения, но, как показано выше, они не дают надежных результатов. Если вы действительно хотите использовать семантику в виде списков в многопоточной среде, вы не сможете добиться этого, установив блокировки внутри методов реализации списков. Вы должны убедиться, что любой индекс, который вы используете, полностью живет в контексте блокировки. В результате вы можете использовать List в многопоточной среде с правильной блокировкой, но сам список не может существовать в этом мире.
Если вы считаете, что вам нужен параллельный список, на самом деле есть только две возможности:
- Что вам действительно нужно, так это ConcurrentBag
- Вам необходимо создать свою собственную коллекцию, возможно, реализованную с помощью List и собственного управления параллелизмом.
Если у вас есть ConcurrentBag и вы находитесь в положении, когда вам нужно передать его как IList, у вас есть проблема, потому что метод, который вы вызываете, указал, что они могут попытаться сделать что-то, как я делал выше, с cat & собака. В большинстве миров это означает, что вызываемый вами метод просто не предназначен для работы в многопоточной среде. Это означает, что вы либо реорганизуете его так, чтобы оно было, либо, если вы не можете, вам придется обращаться с ним очень осторожно. Вы почти наверняка будете обязаны создавать свою собственную коллекцию со своими собственными блокировками и вызывать вызывающий сбой метод внутри блокировки.
ConcurrentList
(как массив с изменяемыми размерами, а не как связанный список) нелегко написать с неблокирующими операциями. Его API плохо переводится в "параллельную" версию.
В тех случаях, когда операции чтения значительно превышают количество записей или (хотя и часто) записи выполняются не одновременно, подход " копирование при записи" может быть уместным.
Реализация, показанная ниже
- беззамочные
- невероятно быстрая для одновременных чтений, даже если параллельные модификации продолжаются - независимо от того, сколько времени они занимают
- поскольку "снимки" являются неизменяемыми, возможна атомарность без блокировки, т.е.
var snap = _list; snap[snap.Count - 1];
никогда не будет (ну, конечно, кроме пустого списка) сгенерировать, и вы также получите поточно-безопасное перечисление с семантикой снимков бесплатно... как я ЛЮБЛЮ неизменность! - реализовано в общем, применимо к любой структуре данных и любому типу модификации
- очень просто, т.е. легко тестировать, отлаживать, проверять, читая код
- можно использовать в.Net 3.5
Для того, чтобы копирование при записи работало, вы должны поддерживать неизменную структуру ваших данных, то есть никому не разрешается изменять их после того, как вы сделали их доступными для других потоков. Когда вы хотите изменить, вы
- клонировать структуру
- внести изменения в клон
- атомно поменять местами ссылку на модифицированный клон
Код
static class CopyOnWriteSwapper
{
public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
where T : class
{
while (true)
{
var objBefore = Volatile.Read(ref obj);
var newObj = cloner(objBefore);
op(newObj);
if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
return;
}
}
}
использование
CopyOnWriteSwapper.Swap(ref _myList,
orig => new List<string>(orig),
clone => clone.Add("asdf"));
Если вам нужна более высокая производительность, это поможет отменить генерацию метода, например, создать один метод для каждого типа модификации (Add, Remove, ...), который вы хотите, и жестко закодировать указатели на функции cloner
а также op
,
NB #1 Вы несете ответственность за то, чтобы никто не изменил (предположительно) неизменную структуру данных. Мы ничего не можем сделать в общей реализации, чтобы предотвратить это, но когда мы специализируемся на List<T>
, вы можете защититься от модификации, используя List.AsReadOnly ()
NB #2 Будьте осторожны со значениями в списке. Приведенный выше подход копирования при записи защищает только их членство в списке, но если вы поместите туда не строки, а некоторые другие изменяемые объекты, вы должны позаботиться о безопасности потоков (например, блокировка). Но это ортогонально этому решению, и, например, блокировка изменяемых значений может быть легко использована без проблем. Вам просто нужно знать об этом.
NB #3. Если ваша структура данных огромна, и вы часто ее модифицируете, подход "копировать все при записи" может быть чрезмерным как с точки зрения потребления памяти, так и с точки зрения затрат на копирование ЦП. В этом случае вы можете использовать неизменные коллекции MS.
System.Collections.Generic.List<t>
уже потокобезопасен для нескольких читателей. Попытка сделать это потокобезопасным для нескольких авторов не имеет смысла. (По причинам, которые Хенк и Стивен уже упоминали)
Некоторые люди высветили некоторые пункты товара (и некоторые мои мысли):
- Это может выглядеть безумным для неспособного случайного доступа (индексатора), но мне кажется, что это нормально. Нужно только думать, что в многопоточных коллекциях существует множество методов, которые могут не работать, например, Indexer и Delete. Вы также можете определить действие сбоя (отступления) для средства доступа для записи, например "fail" или просто "add in the end".
- Не потому, что это многопоточная коллекция, она всегда будет использоваться в многопоточном контексте. Или это может также использоваться только одним писателем и одним читателем.
- Другой способ безопасного использования индексатора может заключаться в том, чтобы обернуть действия в блокировку коллекции, используя ее корень (если он открыт).
- Для многих людей сделать rootLock видимым - это "Хорошая практика". Я не уверен на 100% в этом вопросе, потому что, если он скрыт, вы теряете большую гибкость для пользователя. Мы всегда должны помнить, что программирование многопоточности не для кого-либо. Мы не можем предотвратить любое неправильное использование.
- Microsoft придется проделать определенную работу и определить новый стандарт для правильного использования многопоточной коллекции. Во-первых, IEnumerator не должен иметь moveNext, но должен иметь GetNext, который возвращает true или false и получает выходной параметр типа T (таким образом, итерация больше не будет блокировать). Кроме того, Microsoft уже использует "использование" внутри себя в foreach, но иногда использует IEnumerator напрямую, не заключая его в "использование" (ошибка в представлении коллекции и, возможно, в других местах). Использование IEnumerator в качестве обертки рекомендуется Microsoft. Эта ошибка удаляет хороший потенциал для безопасного итератора... Итератор, который блокирует коллекцию в конструкторе и разблокирует его методом Dispose - для блокирующего метода foreach.
Это не ответ. Это только комментарии, которые не совсем соответствуют конкретному месту.
... Мой вывод, Microsoft должна внести некоторые глубокие изменения в "foreach", чтобы сделать коллекцию MultiThreaded более легкой в использовании. Также он должен следовать собственным правилам использования IEnumerator. До этого мы можем легко написать MultiThreadList, который будет использовать блокирующий итератор, но он не будет следовать за "IList". Вместо этого вам придется определить собственный интерфейс "IListPersonnal", который может завершиться ошибкой при "вставке", "удалении" и произвольном доступе (индексаторе) без исключения. Но кто захочет использовать его, если он не стандартный?
Я реализовал один похожий на Брайана. Мой отличается:
- Я управляю массивом напрямую.
- Я не вхожу в замки в блоке try.
- я использую
yield return
для изготовления счетчика. - Я поддерживаю рекурсию блокировки. Это позволяет читать из списка во время итерации.
- Я использую обновляемые блокировки чтения, где это возможно.
DoSync
а такжеGetSync
методы, позволяющие последовательные взаимодействия, которые требуют эксклюзивного доступа к списку.
Код:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
В последовательно выполняемом коде используемые структуры данных отличаются от (хорошо написанного) одновременно исполняемого кода. Причина в том, что последовательный код подразумевает неявный порядок. Параллельный код, однако, не подразумевает какого-либо порядка; еще лучше это означает отсутствие какого-либо определенного порядка!
Из-за этого структуры данных с подразумеваемым порядком (например, List) не очень полезны для решения параллельных задач. Список подразумевает порядок, но он не дает четкого определения, что это за порядок. Из-за этого порядок выполнения кода, управляющего списком, будет определять (до некоторой степени) неявный порядок списка, который находится в прямом конфликте с эффективным параллельным решением.
Помните, что параллелизм - это проблема с данными, а не проблема с кодом! Вы не можете сначала реализовать код (или переписать существующий последовательный код) и получить хорошо спроектированное параллельное решение. Сначала необходимо спроектировать структуры данных, помня о том, что неявное упорядочение не существует в параллельной системе.
Подход без блокировок Copy and Write отлично работает, если вы не имеете дело с слишком большим количеством элементов. Вот класс, который я написал:
public class CopyAndWriteList<T>
{
public static List<T> Clear(List<T> list)
{
var a = new List<T>(list);
a.Clear();
return a;
}
public static List<T> Add(List<T> list, T item)
{
var a = new List<T>(list);
a.Add(item);
return a;
}
public static List<T> RemoveAt(List<T> list, int index)
{
var a = new List<T>(list);
a.RemoveAt(index);
return a;
}
public static List<T> Remove(List<T> list, T item)
{
var a = new List<T>(list);
a.Remove(item);
return a;
}
}
пример использования: orders_BUY = CopyAndWriteList.Clear(orders_BUY);
Я удивлен, что никто не упомянул об использовании в качестве основы для написания специализированного класса.
Часто нам не нужны полные API различных классов коллекций, и если вы пишете в основном функциональный код без побочных эффектов, используя, насколько это возможно, неизменяемые классы, то вы фактически НЕ захотите изменять коллекцию в пользу различных реализаций моментальных снимков.
LinkedList решает некоторые сложные проблемы создания моментальных копий / клонов больших коллекций. Я также использую его для создания «поточно-безопасных» перечислителей для перечисления по коллекции. Я могу обмануть, потому что знаю, что я не изменяю коллекцию никаким другим способом, кроме добавления, я могу отслеживать размер списка и блокировать только изменения размера списка. Затем мой код перечислителя просто перечисляет от 0 до n для любого потока, который хочет получить «моментальный снимок» коллекции только для добавления, который будет гарантированно представлять «моментальный снимок» коллекции в любой момент времени, независимо от того, какие другие потоки добавление к заголовку коллекции.
Я почти уверен, что большинство требований часто чрезвычайно просты, и вам нужно всего 2 или 3 метода. Написать действительно универсальную библиотеку ужасно сложно, но решение ваших собственных потребностей в кодах иногда может быть простым с помощью пары уловок.
Да здравствует
LinkedList
Ура, ... люблю вас всех! Al
ps образец взлома
AppendOnly