Сообщения теряются при использовании из BlockingCollection в пакетах

Я пытался изобрести метод для потребления партии из BlockingCollection и попал в беду.

Вот минимальное воспроизведение:

internal class Program
{
    private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
    private static int _consumed;

    static void Main()
    {
        Task.Run(() => Producer());
        Task.Run(() => Consumer());
        Console.WriteLine("press [ENTER] to check");
        while (true)
        {
            Console.ReadLine();
            Console.WriteLine("consumed: " + _consumed);
        }
    }

    private static void Producer()
    {
        for (var i = 0; i < 5000; i++)
            _bc.Add("msg");
    }

    private static void Consumer()
    {
        foreach (var s in _bc.GetConsumingEnumerable())
        {
            var batchSize = _bc.Count + 1;
            var batch = new List<string>(batchSize) { s };
            while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
                batch.Add(additionalResult);
            _consumed = _consumed + batch.Count;
        }
    }
}

Несколько сообщений потеряны (но не всегда одно и то же число). Если вы не можете воспроизвести его, попробуйте увеличить количество создаваемых сообщений.

То, что я пытаюсь достичь, это использовать GetConsumingEnumerable метод в потребителе (через некоторое время я буду называть CompleteAdding) и возможность собирать пакеты сообщений некоторого размера, если они уже присутствуют.

В чем причина потери сообщений и как правильно их использовать?

2 ответа

 [__DynamicallyInvokable]
    public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
    {
      ...
        while (!this.IsCompleted)
        {
          T obj;
          if (this.TryTakeWithNoTimeValidation(out obj, -1, cancellationToken, linkedTokenSource))
            yield return obj;
        }
      ...
    }

а также

public bool TryTake(out T item)
{
  ...
  return this.TryTakeWithNoTimeValidation(out item, (int) timeout.TotalMilliseconds, CancellationToken.None, (CancellationTokenSource) null);
}

оба метода TryTake и GetConsumingEnumerable используют метод TryTakeWithNoTimeValidation . Я предполагаю, что отсутствующие элементы были удалены из коллекции с помощью GetConsumingEnumerable. рассмотрим следующий пример:

private static void Producer()
{
    Console.WriteLine($"begin produce isCompleted:{_bc.IsCompleted}");
    for (var i = 0; i < 5000; i++)
        _bc.Add($"msg:{i}");
    _bc.CompleteAdding();
    Console.WriteLine($"end produce isCompleted:{_bc.IsCompleted}");
}
var batch = new List<string>();
foreach (var s in _bc.GetConsumingEnumerable())
{
    batch.Add(s);
    if (_bc.IsCompleted && _bc.Count == 0)
    {
       break;
    }
}
Console.WriteLine($"first:{batch.First()}, last:{batch.Last()}");
Console.WriteLine($"consumed:{batch.Count}");

_bc пусто. Есть несколько способов реализации вашего алгоритма, один из которых я рекомендую использовать Take и вызывать потребителя перед производителем (который блокирует вызывающий поток).

Вот это да. Это ошибка. Эта линия

while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)

должно быть

while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))

так как первое условие имеет побочный эффект удаления элемента из коллекции.

Другие вопросы по тегам