Почему блоки работают в таком порядке?

Это краткий пример кода, чтобы быстро представить вам, о чем мой вопрос:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);

            var populateTask = Task.Run(async () =>
            {
                foreach (var x in Enumerable.Range(1, 15))
                {
                    await firstBlock.SendAsync(x);
                }
            });

            populateTask.Wait();
            secondBlock.Completion.Wait();
        }
    }
}

Выход:

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14

Почему этот порядок и как я могу изменить сеть, чтобы получить вывод ниже?

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)

Поэтому мне интересно, почему все другие блоки (или задачи здесь) должны ждать отложенного блока?


ОБНОВИТЬ

Так как вы, ребята, попросили меня объяснить мою проблему более подробно, я сделал этот пример, который был ближе к реальному конвейеру, над которым я работаю. Допустим, приложение загружает некоторые данные и вычисляет хэш на основе возвращенного ответа.

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

Давайте посмотрим на порядок запросов:

Запросы

Это определенно имеет смысл. Все запросы сделаны как можно скорее. Медленный четвертый запрос находится в конце списка.

Теперь давайте посмотрим, какой у нас результат:

09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3

Вы можете видеть, что все хэши после третьего были вычислены сразу после получения четвертого ответа.

Таким образом, основываясь на этих двух фактах, мы можем сказать, что все загруженные страницы ожидали медленного четвертого запроса. Лучше не ждать четвертого запроса и вычислять хэши, как только данные будут загружены. Есть ли способ, которым я могу достичь этого?

3 ответа

Решение

Хорошо, по ссылке от @SirRufo я начал думать о реализации своих собственных TransformBlock это будет соответствовать моим потребностям и обрабатывать поступающие товары без учета порядка. Таким образом, это не разрушит сеть, установит разрыв между блоками в части загрузки и станет элегантным способом.

Поэтому я начал смотреть на то, что и как я могу это сделать. Чтобы заглянуть в источники TransformBlock Само по себе казалось хорошей отправной точкой, поэтому я открыл TransformBlock источники на Github и начали его анализировать.

С самого начала урока я обнаружил интересную вещь: // Если используется параллелизм, нам нужно будет поддерживать переупорядочение сообщений, которые завершаются не по порядку.

// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
    _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}

Похоже, именно то, что мы хотим! Давайте посмотрим на это EnsureOrdered вариант в DataflowBlockOptions класс на Github:

/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes 
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer).  Some blocks may allow this to be relaxed,
/// however.  Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so.  This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
    get { return _ensureOrdered; }
    set { _ensureOrdered = value; }
}

Это выглядело действительно хорошо, поэтому я немедленно переключился на IDE, чтобы установить его. К сожалению, не было никаких настроек, подобных этой:

Нет EnsureOrdered

Я продолжал искать и нашел эту заметку:

4.5.25-бета-23019

Пакет был переименован в System.Threading.Tasks.Dataflow

Когда я гуглил и нашел этот пакет, позвонил System.Threading.Tasks.Dataflow! Поэтому я удалил Microsoft.Tpl.Dataflow пакет и установлен System.Threading.Tasks.Dataflow выпуская:

Install-Package System.Threading.Tasks.Dataflow

И был EnsureOrdered вариант. Я обновил код с настройкой EnsureOrdered в false:

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false };
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options);

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, options);

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, options);

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, options);

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

И результат вывода был именно то, что я хочу:

10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA

Это разработано и задокументировано

Поскольку каждый предопределенный исходный тип блока потока данных гарантирует, что сообщения распространяются в том порядке, в котором они получены, ...

Доказательство:

var ts = Environment.TickCount;

var firstBlock = new TransformBlock<int, int>(
    x => x,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
    } );

var secondBlock = new TransformBlock<int, string>(
    x =>
    {
        var start = Environment.TickCount;

        if ( x == 3 )
        {
            Thread.Sleep( 5000 );
            return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }

        return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        // limit the internal queue to 10 items
        BoundedCapacity = 10,
    } );

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 1,
    } );

firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } );
secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } );

foreach ( var x in Enumerable.Range( 1, 15 ) )
{
    // to ensure order of items
    firstBlock.Post( x );
}

firstBlock.Complete();
thirdBlock.Completion.Wait();

Выход:

Начало 31 Завершено 31: сообщение 1
Начало 31 Завершено 31: сообщение 2
Начало 31 Завершено 5031: сообщение 3 (задержанное сообщение!)
Начало 31 Завершено 31: сообщение 4
Начало 31 Завершено 31: сообщение 5
Начало 31 Завершено 31: сообщение 6
Начало 31 Завершено 31: сообщение 7
Начало 31 Завершено 31: сообщение 8
Начало 31 Завершено 31: сообщение 9
Начало 31 Завершено 31: сообщение 10
Начало 31 Завершено 31: сообщение 11
Начало 31 Завершено 31: сообщение 12
Начало 5031 Завершено 5031: сообщение 13
Начало 5031 Завершено 5031: сообщение 14
Начало 5031 Завершено 5031: сообщение 15

Решение 1

Не используйте DataFlow для загрузки части, потому что гарантия заказа заблокирует обработку, которую вы ищете.

var ts = Environment.TickCount;

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 4,
    } );

Parallel.ForEach(
    Enumerable.Range( 1, 15 ),
    new ParallelOptions { MaxDegreeOfParallelism = 4, },
    x =>
    {
        var start = Environment.TickCount;
        string result;

        if ( x == 12 )
        {
            Thread.Sleep( 5000 );
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }
        else
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
        thirdBlock.Post( result );
    } );

thirdBlock.Complete();
thirdBlock.Completion.Wait();

Выход:

Начало 32 Завершено 32: Сообщение 2 Завершено 32 Завершено 32: Сообщение 6 Запущено 32 Завершено 32: Сообщение 5 Завершено 32 Завершено 32: Сообщение 8 Запущено 32 Завершено 32: Сообщение 9 Запущено 32 Завершено 32: Сообщение 10 Запущено 32 Завершено 32: сообщение 11 начало 32 завершено 32: сообщение 7 начало 32 завершено 32: сообщение 13 начало 32 завершено 32: сообщение 14 начало 32 завершено 32: сообщение 15 начало 32 завершено 32: сообщение 3 запущено 32 завершено 32: Сообщение 4 Начало 32 Завершено 32: Сообщение 1 Начало 32 Завершено 5032: Сообщение 12 (Это отложенное сообщение!) 

Решение 2

Конечно, вы можете реализовать IPropagatorBlock<TInput,TOutput> в пользовательском классе, который не гарантирует порядок предметов.

Глядя на временные метки, вывод второго блока работает так, как вы ожидаете, - отложенный TransformBlock запускается после всех остальных TransformBlock. Кажется, это Console.WriteLine в ActionBlock, который вызывается не в том порядке, в котором вы ожидаете.

Ваш код secondBlock.Completion.Wait(); неверно - должно быть thirdBlock.Completion.Wait(); для того, чтобы получить ожидаемые результаты?

Другие вопросы по тегам