Task.WhenAny действует как Task.WhenAll

Я написал небольшую программу для тестирования с использованием BufferBlock (System.Threading.Tasks.Dataflow) для реализации очереди потребитель-производитель с двумя приоритетами.

Потребитель всегда должен сначала использовать любые элементы из очереди с высоким приоритетом.

В этом первоначальном тесте у меня был производитель, работающий с гораздо более медленной скоростью, чем потребитель, поэтому данные должны выходить в том же порядке, в котором они были, независимо от приоритета.

Тем не менее, я считаю, что результатом Task.WhenAny() не завершается, пока в обеих очередях нет чего-либо (или есть завершение), таким образом, действует как Task.WhenAll(),

Я думал, что понял async/awaitи я прочитал "Параллельность в C# Cookbook". Однако что-то происходит, чего я не понимаю.

Есть идеи?

Код:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;  // add nuget package, 4.8.0
using static System.Console;

namespace DualBufferBlockExample { // .Net Framework 4.6.1

    class Program {
        private static async Task Produce(BufferBlock<int> queueLo, BufferBlock<int> queueHi, IEnumerable<int> values) {
            await Task.Delay(10);
            foreach(var value in values) {
                if(value == 3 || value == 7)
                    await queueHi.SendAsync(value);
                else
                    await queueLo.SendAsync(value);
                WriteLine($"Produced {value}  qL.Cnt={queueLo.Count} qH.Cnt={queueHi.Count}");
                await Task.Delay(1000);  // production lag
            }

            queueLo.Complete();
            queueHi.Complete();
        }
        private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queueLo, BufferBlock<int> queueHi) {
            var results = new List<int>();

            while(true) {
                int value = -1;

                while(queueLo.Count > 0 || queueHi.Count > 0) {  // take from hi-priority first
                    if(queueHi.TryReceive(out value) ||
                        queueLo.TryReceive(out value)) {  // process value
                        results.Add(value);
                        WriteLine($"    Consumed {value}");
                        await Task.Delay(100); // consumer processing time shorter than production
                    }
                }

                var hasNorm = queueHi.OutputAvailableAsync();
                var hasLow = queueLo.OutputAvailableAsync();
                var anyT = await Task.WhenAny(hasNorm, hasLow);  // <<<<<<<<<< behaves like WhenAll
                WriteLine($"  WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");

                if(!anyT.Result)
                    break;  // both queues are empty & complete
            }

            return results;
        }
        static async Task TestDataFlow() {
            var queueLo = new BufferBlock<int>();
            var queueHi = new BufferBlock<int>();

            // Start the producer and consumer.
            var consumer = Consume(queueLo, queueHi);
            WriteLine("Consumer Started");

            var producer = Produce(queueLo, queueHi, Enumerable.Range(0, 10));
            WriteLine("Producer Started");

            // Wait for everything to complete.
            await Task.WhenAll(producer, consumer, queueLo.Completion, queueHi.Completion);

            // show consumer's output
            var results = await consumer;

            Write("Results:");
            foreach(var x in results)
                Write($" {x}");
            WriteLine();
        }
        static void Main(string[] args) {
            try {
                TestDataFlow().Wait();
            } catch(Exception ex) {
                WriteLine($"TestDataFlow exception: {ex.ToString()}");
            }
            ReadLine();
        }
    }
}

Выход:

Consumer Started
Producer Started
Produced 0  qL.Cnt=1 qH.Cnt=0
Produced 1  qL.Cnt=2 qH.Cnt=0
Produced 2  qL.Cnt=3 qH.Cnt=0
Produced 3  qL.Cnt=3 qH.Cnt=1
  WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
    Consumed 3
    Consumed 0
    Consumed 1
    Consumed 2
Produced 4  qL.Cnt=1 qH.Cnt=0
Produced 5  qL.Cnt=2 qH.Cnt=0
Produced 6  qL.Cnt=3 qH.Cnt=0
Produced 7  qL.Cnt=3 qH.Cnt=1
  WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
    Consumed 7
    Consumed 4
    Consumed 5
    Consumed 6
Produced 8  qL.Cnt=1 qH.Cnt=0
Produced 9  qL.Cnt=2 qH.Cnt=0
  WhenAny True qL.Result=True qH.Result=False qL.Count=2 qH.Count=0
    Consumed 8
    Consumed 9
  WhenAny False qL.Result=False qH.Result=False qL.Count=0 qH.Count=0
Results: 3 0 1 2 7 4 5 6 8 9

1 ответ

Решение

После звонка WhenAny Вы сразу же блокируете обе задачи, используя .Result не зная, что они оба завершены.

var anyT = await Task.WhenAny(hasNorm, hasLow);

//This line blocks on both the hasNorm and hasLow tasks preventing execution from continuing. 
WriteLine($"  WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");

awaiting обе задачи также дадут вам одинаковое поведение. Лучшее, что вы можете сделать, это await задание вернулось из WhenAny и печатать только результаты выполненного задания.

Кроме того, приоритетная очередь не является чем-то TPL-Dataflow преуспевает из коробки. Он обрабатывает все сообщения одинаково, так что вы заканчиваете подключать свою собственную приоритетную реализацию. Тем не менее, вы можете заставить его работать.

Другие вопросы по тегам