BroadcastBlock с гарантированной доставкой в ​​потоке данных TPL

У меня есть поток данных, который я обрабатываю несколькими различными способами... поэтому я хотел бы отправить копию каждого полученного сообщения нескольким целям, чтобы эти цели могли выполняться параллельно... однако мне нужно установить BoundedCapacity на моих блоках, потому что данные передаются намного быстрее, чем мои цели могут с ними справиться, и есть тонна данных. Без BoundedCapacity я бы быстро исчерпал память.

Однако проблема в том, что BroadcastBlock будет отбрасывать сообщения, если цель не может обработать это (из-за BoundedCapacity).

Что мне нужно, так это BroadcastBlock, который не будет отбрасывать сообщения, но по существу откажется от дополнительного ввода, пока не сможет доставить сообщения каждой цели, а затем будет готов к большему.

Есть ли что-то подобное или кто-нибудь написал собственный блок, который ведет себя таким образом?

1 ответ

Решение

Довольно просто построить то, что вы просите, используя ActionBlock а также SendAsync(), что-то вроде:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets)
{
    var targetsList = targets.ToList();

    return new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

Это самая базовая версия, но расширение ее для поддержки изменяемого списка целей, распространения функции завершения или клонирования должно быть простым.

Вот отточенная версия идеи Свика. В GuaranteedDeliveryBroadcastBlock класс ниже является (почти) полной заменой встроенного BroadcastBlock. Поддерживается связывание и отключение целей в любой момент.

public class GuaranteedDeliveryBroadcastBlock<T> :
    ITargetBlock<T>, ISourceBlock<T>, IPropagatorBlock<T, T>
{
    private class Subscription
    {
        public readonly ITargetBlock<T> Target;
        public readonly bool PropagateCompletion;
        public readonly CancellationTokenSource CancellationSource;

        public Subscription(ITargetBlock<T> target,
            bool propagateCompletion,
            CancellationTokenSource cancellationSource)
        {
            Target = target;
            PropagateCompletion = propagateCompletion;
            CancellationSource = cancellationSource;
        }
    }

    private readonly object _locker = new object();
    private readonly Func<T, T> _cloningFunction;
    private readonly CancellationToken _cancellationToken;
    private readonly ITargetBlock<T> _actionBlock;
    private readonly List<Subscription> _subscriptions = new List<Subscription>();
    private readonly Task _completion;
    private CancellationTokenSource _faultCTS
        = new CancellationTokenSource(); // Is nullified on completion

    public GuaranteedDeliveryBroadcastBlock(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        _cloningFunction = cloningFunction
            ?? throw new ArgumentNullException(nameof(cloningFunction));
        dataflowBlockOptions ??= new DataflowBlockOptions();
        _cancellationToken = dataflowBlockOptions.CancellationToken;

        _actionBlock = new ActionBlock<T>(async item =>
        {
            Task sendAsyncToAll;
            lock (_locker)
            {
                var allSendAsyncTasks = _subscriptions
                    .Select(sub => sub.Target.SendAsync(
                        _cloningFunction(item), sub.CancellationSource.Token));
                sendAsyncToAll = Task.WhenAll(allSendAsyncTasks);
            }
            await sendAsyncToAll;
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
            TaskScheduler = dataflowBlockOptions.TaskScheduler,
        });

        var afterCompletion = _actionBlock.Completion.ContinueWith(t =>
        {
            lock (_locker)
            {
                // PropagateCompletion
                foreach (var subscription in _subscriptions)
                {
                    if (subscription.PropagateCompletion)
                    {
                        if (t.IsFaulted)
                            subscription.Target.Fault(t.Exception);
                        else
                            subscription.Target.Complete();
                    }
                }
                // Cleanup
                foreach (var subscription in _subscriptions)
                {
                    subscription.CancellationSource.Dispose();
                }
                _subscriptions.Clear();
                _faultCTS.Dispose();
                _faultCTS = null; // Prevent future subscriptions to occur
            }
        }, TaskScheduler.Default);

        // Ensure that any exception in the continuation will be surfaced
        _completion = Task.WhenAll(_actionBlock.Completion, afterCompletion);
    }

    public Task Completion => _completion;

    public void Complete() => _actionBlock.Complete();

    void IDataflowBlock.Fault(Exception ex)
    {
        _actionBlock.Fault(ex);
        lock (_locker) _faultCTS?.Cancel();
    }

    public IDisposable LinkTo(ITargetBlock<T> target,
        DataflowLinkOptions linkOptions)
    {
        if (linkOptions.MaxMessages != DataflowBlockOptions.Unbounded)
            throw new NotSupportedException();
        Subscription subscription;
        lock (_locker)
        {
            if (_faultCTS == null) return new Unlinker(null); // Has completed
            var cancellationSource = CancellationTokenSource
                .CreateLinkedTokenSource(_cancellationToken, _faultCTS.Token);
            subscription = new Subscription(target,
                linkOptions.PropagateCompletion, cancellationSource);
            _subscriptions.Add(subscription);
        }
        return new Unlinker(() =>
        {
            lock (_locker)
            {
                // The subscription may have already been removed
                if (_subscriptions.Remove(subscription))
                {
                    subscription.CancellationSource.Cancel();
                    subscription.CancellationSource.Dispose();
                }
            }
        });
    }

    private class Unlinker : IDisposable
    {
        private readonly Action _action;
        public Unlinker(Action disposeAction) => _action = disposeAction;
        void IDisposable.Dispose() => _action?.Invoke();
    }

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _actionBlock.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target, out bool messageConsumed)
            => throw new NotSupportedException();

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();
}

Отсутствующие функции: IReceivableSourceBlock<T> интерфейс не реализован, и связь с MaxMessages опция не поддерживается.

Этот класс потокобезопасен.

Другие вопросы по тегам