TPL Dataflow Complete Pipeline, когда условие соответствует

Я думал, что это очень простой подход, но я пока не нашел ни одного примера. У меня есть один производитель и один потребитель, и я хочу закончить конвейер, когда было обработано как минимум x объектов. Дополнительно мне нужно знать, какие объекты были получены.

Вот как я это делаю:

public class BlockTester
{
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        _worker = new TransformBlock<int, int>(s => s + s);
        var buffer = new BufferBlock<int>();
        var consumeTask = Consume(buffer);

        _worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});

        foreach (var value in Enumerable.Range(0,100))
        {
            _worker.Post(value);
        }

        _worker.Complete();

        await buffer.Completion;

        if(buffer.TryReceiveAll(out var received))
        {
            Console.WriteLine(string.Join(", ", received));
        }
    }

    public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
    {
        var received = new List<int>();

        while (await buffer.OutputAvailableAsync())
        {
            var current = buffer.Receive();

            received.Add(current);

            if (current > 25)
            {
                _worker.Complete();
            }
        }

        return received;
    }
}

Я немного запутался насчет буфера. Попробуйте все. В чем разница между ожиданием задачи потребления и TryReceiveAll? Почему TryReceiveAll ложно в моем сценарии? Я думаю, что все еще что-то не так с моим подходом для достижения моих целей

1 ответ

Решение

Ваш Consume метод должен быть ActionBlock, Там нет необходимости использовать OutputAvailableAsync или же TryRecieveAll, Заменить BufferBlock с ActionBlock и сделать обработку в пределах ActionBlock, Непонятно, зачем вам TransformBlock либо если у вас нет более одного шага в процессе.

public class BlockTester
{
    //Could be removed
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<int, int>(s => s + s);
        var processor = new ActionBlock<int>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static int itemsRecieved = 0;
    public static void ProcessMessage(int x)
    {
        Interlocked.Increment(ref itemsRecieved);
        if (itemsRecieved > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

Или со сложным объектом сообщения:

public class Message { }

public class BlockTester
{
    //Could be removed
    private static TransformBlock<Message, Message> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<Message, Message>(s => s);
        var processor = new ActionBlock<Message>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
    public static void ProcessMessage(Message x)
    {
        itemsRecieved.Add(x);
        if (itemsRecieved.Count > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

Изменить Чтобы ответить на оригинальный вопрос:

Почему TryReceiveAll вернуть ложь:

Потому что к тому времени TryReceiveAll это побежал BufferBlock "завершено". Для завершения блока он должен содержать 0 элементов в своем выходном буфере. Consume метод вытаскивал все элементы до того, как блоку позволили завершить, и вы, наконец, позвоните TryRecieveAll на пустом блоке.

Другие вопросы по тегам