Потребитель BlockingCollection повторяет вывод
TL;DR У меня есть приложение, которое читает сообщения с USB-устройства в фоновом режиме и отображает сообщения на экране. Я использую BlockingCollection, так как мне нужно быстро читать сообщения, чтобы устройство не получало BufferOverflow.
Я читаю такие сообщения (мой продюсер):
private void ReadMessages(BlockingCollection<object> logMessages)
{
uint numMsgs;
Status status;
Message[] msgs = new Message[10];
while(!logMessages.IsAddingCompleted)
{
numMsgs = (uint) msgs.Length;
status = readMessages(channel, msgs, ref numMsgs, 1000);
if(status == Status.ERR_BUFFER_OVERFLOW)
{
logMessages.Add("BUFFER OVERFLOW - MESSAGES LOST!");
logMessages.Add(CopyMessages(msgs, numMsgs));
}
else if(status == Status.STATUS_NOERROR)
{
logMessages.Add(CopyMessages(msgs, numMsgs));
}
else
{
throw new Exception("Error");
}
}
Метод readMessages() заполнит msgs
массив с Message
объекты читаются, а numMsgs
ссылка содержит количество прочитанных сообщений (до 10). Я использую функцию под названием CopyMessages()
так что я только передаю Message[]
это правильный размер. т.е. если прочитано 5 сообщений, я отправляю Message[5]
вместо Message[10]
,
Я читаю сообщения (мой потребитель) следующим образом:
private void DisplayMessages(BlockingCollection<object> messages)
{
string[] msgs;
try
{
foreach (var item in messages.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
if (item is string)
{
msgs = new string[] { item.ToString() };
}
else if (item is PassThruMsg[])
{
msgs = FormatMessages((PassThruMsg[])item);
}
else
{
msgs = new string[0];
}
Task.Factory.StartNew(new Action(() => outputTextBox.AppendText(String.Join(Environment.NewLine, msgs) + Environment.NewLine)), _cancellationTokenSource.Token, TaskCreationOptions.None, uiContext);
}
}
catch (OperationCanceledException)
{
//TODO:
}
}
Я запускаю задачи внутри нажатия кнопки, вот так:
var results = new BlockingCollection<object>();
var display = Task.Factory.StartNew(() => DisplayMessages(results));
var readMessages = Task.Factory.StartNew(() => ReadMessages(results));
Task[] tasks = new Task[] { display, readMessages };
try
{
await Task.Factory.ContinueWhenAll(tasks, result => { results.CompleteAdding(); }, _cancellationTokenSource.Token, TaskContinuationOptions.None, uiContext);
}
catch (TaskCanceledException)
{
//TODO:
}
Это отлично работает, и при бездействии печатает сообщения с устройства без проблем. Однако после того, как устройство начинает выполнять работу под очень большой нагрузкой (потребитель вызывается так быстро, что он временно блокирует пользовательский интерфейс), я замечаю, что выходное текстовое поле повторяет значения. Насколько я понимаю, что GetConsumingEnumerable()
также удаляет элементы из коллекции блокировок, но я не знаю, почему иначе я бы видел сообщения, напечатанные несколько раз. Каждое сообщение имеет временную метку, и когда я читаю сообщения с устройства, оно очищает буфер, поэтому я знаю, что я не читаю это сообщение несколько раз.
Я что-то здесь упускаю? Есть ли лучший способ справиться с этим сценарием производителя / потребителя для обеспечения точных данных? Я посмотрел, есть ли где-нибудь ссылки, которые могут перекрываться, но я этого не вижу.