Task.WhenAny действует как Task.WhenAll
Я написал небольшую программу для тестирования с использованием BufferBlock (System.Threading.Tasks.Dataflow) для реализации очереди потребитель-производитель с двумя приоритетами.
Потребитель всегда должен сначала использовать любые элементы из очереди с высоким приоритетом.
В этом первоначальном тесте у меня был производитель, работающий с гораздо более медленной скоростью, чем потребитель, поэтому данные должны выходить в том же порядке, в котором они были, независимо от приоритета.
Тем не менее, я считаю, что результатом Task.WhenAny()
не завершается, пока в обеих очередях нет чего-либо (или есть завершение), таким образом, действует как Task.WhenAll()
,
Я думал, что понял async
/await
и я прочитал "Параллельность в C# Cookbook". Однако что-то происходит, чего я не понимаю.
Есть идеи?
Код:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow; // add nuget package, 4.8.0
using static System.Console;
namespace DualBufferBlockExample { // .Net Framework 4.6.1
class Program {
private static async Task Produce(BufferBlock<int> queueLo, BufferBlock<int> queueHi, IEnumerable<int> values) {
await Task.Delay(10);
foreach(var value in values) {
if(value == 3 || value == 7)
await queueHi.SendAsync(value);
else
await queueLo.SendAsync(value);
WriteLine($"Produced {value} qL.Cnt={queueLo.Count} qH.Cnt={queueHi.Count}");
await Task.Delay(1000); // production lag
}
queueLo.Complete();
queueHi.Complete();
}
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queueLo, BufferBlock<int> queueHi) {
var results = new List<int>();
while(true) {
int value = -1;
while(queueLo.Count > 0 || queueHi.Count > 0) { // take from hi-priority first
if(queueHi.TryReceive(out value) ||
queueLo.TryReceive(out value)) { // process value
results.Add(value);
WriteLine($" Consumed {value}");
await Task.Delay(100); // consumer processing time shorter than production
}
}
var hasNorm = queueHi.OutputAvailableAsync();
var hasLow = queueLo.OutputAvailableAsync();
var anyT = await Task.WhenAny(hasNorm, hasLow); // <<<<<<<<<< behaves like WhenAll
WriteLine($" WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");
if(!anyT.Result)
break; // both queues are empty & complete
}
return results;
}
static async Task TestDataFlow() {
var queueLo = new BufferBlock<int>();
var queueHi = new BufferBlock<int>();
// Start the producer and consumer.
var consumer = Consume(queueLo, queueHi);
WriteLine("Consumer Started");
var producer = Produce(queueLo, queueHi, Enumerable.Range(0, 10));
WriteLine("Producer Started");
// Wait for everything to complete.
await Task.WhenAll(producer, consumer, queueLo.Completion, queueHi.Completion);
// show consumer's output
var results = await consumer;
Write("Results:");
foreach(var x in results)
Write($" {x}");
WriteLine();
}
static void Main(string[] args) {
try {
TestDataFlow().Wait();
} catch(Exception ex) {
WriteLine($"TestDataFlow exception: {ex.ToString()}");
}
ReadLine();
}
}
}
Выход:
Consumer Started
Producer Started
Produced 0 qL.Cnt=1 qH.Cnt=0
Produced 1 qL.Cnt=2 qH.Cnt=0
Produced 2 qL.Cnt=3 qH.Cnt=0
Produced 3 qL.Cnt=3 qH.Cnt=1
WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
Consumed 3
Consumed 0
Consumed 1
Consumed 2
Produced 4 qL.Cnt=1 qH.Cnt=0
Produced 5 qL.Cnt=2 qH.Cnt=0
Produced 6 qL.Cnt=3 qH.Cnt=0
Produced 7 qL.Cnt=3 qH.Cnt=1
WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
Consumed 7
Consumed 4
Consumed 5
Consumed 6
Produced 8 qL.Cnt=1 qH.Cnt=0
Produced 9 qL.Cnt=2 qH.Cnt=0
WhenAny True qL.Result=True qH.Result=False qL.Count=2 qH.Count=0
Consumed 8
Consumed 9
WhenAny False qL.Result=False qH.Result=False qL.Count=0 qH.Count=0
Results: 3 0 1 2 7 4 5 6 8 9
1 ответ
После звонка WhenAny
Вы сразу же блокируете обе задачи, используя .Result
не зная, что они оба завершены.
var anyT = await Task.WhenAny(hasNorm, hasLow);
//This line blocks on both the hasNorm and hasLow tasks preventing execution from continuing.
WriteLine($" WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");
awaiting
обе задачи также дадут вам одинаковое поведение. Лучшее, что вы можете сделать, это await
задание вернулось из WhenAny
и печатать только результаты выполненного задания.
Кроме того, приоритетная очередь не является чем-то TPL-Dataflow
преуспевает из коробки. Он обрабатывает все сообщения одинаково, так что вы заканчиваете подключать свою собственную приоритетную реализацию. Тем не менее, вы можете заставить его работать.