Потребитель BlockingCollection повторяет вывод

TL;DR У меня есть приложение, которое читает сообщения с USB-устройства в фоновом режиме и отображает сообщения на экране. Я использую BlockingCollection, так как мне нужно быстро читать сообщения, чтобы устройство не получало BufferOverflow.

Я читаю такие сообщения (мой продюсер):

private void ReadMessages(BlockingCollection<object> logMessages)
{
   uint numMsgs;
   Status status;
   Message[] msgs = new Message[10];
   while(!logMessages.IsAddingCompleted)
   {
      numMsgs = (uint) msgs.Length;
      status = readMessages(channel, msgs, ref numMsgs, 1000);
      if(status == Status.ERR_BUFFER_OVERFLOW)
      {
         logMessages.Add("BUFFER OVERFLOW - MESSAGES LOST!");
         logMessages.Add(CopyMessages(msgs, numMsgs));
      }
      else if(status == Status.STATUS_NOERROR)
      {
         logMessages.Add(CopyMessages(msgs, numMsgs));
      }
      else
      {
         throw new Exception("Error");
      }
}

Метод readMessages() заполнит msgs массив с Message объекты читаются, а numMsgs ссылка содержит количество прочитанных сообщений (до 10). Я использую функцию под названием CopyMessages() так что я только передаю Message[] это правильный размер. т.е. если прочитано 5 сообщений, я отправляю Message[5] вместо Message[10],

Я читаю сообщения (мой потребитель) следующим образом:

private void DisplayMessages(BlockingCollection<object> messages)
{
    string[] msgs;
    try
    {
        foreach (var item in messages.GetConsumingEnumerable(_cancellationTokenSource.Token))
        {
            if (item is string)
            {
                msgs = new string[] { item.ToString() };
            }
            else if (item is PassThruMsg[])
            {
                msgs = FormatMessages((PassThruMsg[])item);
            }
            else
            {
                msgs = new string[0];
            }

            Task.Factory.StartNew(new Action(() => outputTextBox.AppendText(String.Join(Environment.NewLine, msgs) + Environment.NewLine)), _cancellationTokenSource.Token, TaskCreationOptions.None, uiContext);
        }
    }
    catch (OperationCanceledException)
    {
        //TODO:
    }
}

Я запускаю задачи внутри нажатия кнопки, вот так:

var results = new BlockingCollection<object>();

var display = Task.Factory.StartNew(() => DisplayMessages(results));
var readMessages = Task.Factory.StartNew(() => ReadMessages(results));

Task[] tasks = new Task[] { display, readMessages };

try
{
    await Task.Factory.ContinueWhenAll(tasks, result => { results.CompleteAdding(); }, _cancellationTokenSource.Token, TaskContinuationOptions.None, uiContext);
}
catch (TaskCanceledException)
{
    //TODO:
}

Это отлично работает, и при бездействии печатает сообщения с устройства без проблем. Однако после того, как устройство начинает выполнять работу под очень большой нагрузкой (потребитель вызывается так быстро, что он временно блокирует пользовательский интерфейс), я замечаю, что выходное текстовое поле повторяет значения. Насколько я понимаю, что GetConsumingEnumerable() также удаляет элементы из коллекции блокировок, но я не знаю, почему иначе я бы видел сообщения, напечатанные несколько раз. Каждое сообщение имеет временную метку, и когда я читаю сообщения с устройства, оно очищает буфер, поэтому я знаю, что я не читаю это сообщение несколько раз.

Я что-то здесь упускаю? Есть ли лучший способ справиться с этим сценарием производителя / потребителя для обеспечения точных данных? Я посмотрел, есть ли где-нибудь ссылки, которые могут перекрываться, но я этого не вижу.

0 ответов

Другие вопросы по тегам