Использование Polly с потоком данных TPL

Трубопроводы обработки данных и временная обработка ошибок, кажется, идут рука об руку, поэтому мне интересно узнать, смогу ли я получить 2 из лучших библиотек для них - TPL Dataflow и Polly, соответственно - для приятного взаимодействия.

В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock, В идеале я хотел бы заключить его в метод создания блока с такой подписью:

ITargetBlock<T> CreatePollyBlock<T>(
    Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)

Было бы достаточно просто policy.Execute действие изнутри ActionBlock, но у меня есть эти 2 требования:

  1. В случае повторной попытки я не хочу, чтобы повторный элемент имел приоритет над другими элементами, находящимися в очереди. Другими словами, когда вы терпите неудачу, вы переходите на задний план.
  2. Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало вход новых элементов. И если ExecutionDataflowBlockOptions.MaxDegreeOfParallelism установлено, я не хочу, чтобы элементы, ожидающие повторения, "посчитали" против этого максимума.

Чтобы удовлетворить эти требования, я считаю, что мне нужен "внутренний" ActionBlock с предоставленным пользователем ExecutionDataflowBlockOptions Применяется, и некоторый "внешний" блок, который отправляет элементы во внутренний блок и применяет любую логику ожидания и повтора (или то, что диктует политика) вне контекста внутреннего блока. Вот моя первая попытка:

// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
    private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();

    public T Data { get; set; }
    public Task Completion => _tcs.Task;

    public void SetCompleted() => _tcs.SetResult(0);
    public void SetFailed(Exception ex) => _tcs.SetException(ex);
}

ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
    // create a block that marks WorkItems completed, and allows
    // items to fault without faulting the entire block.
    var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
        try {
            act(wi.Data);
            wi.SetCompleted();
        }
        catch (Exception ex) {
            wi.SetFailed(ex);
        }
    }, opts);

    return new ActionBlock<T>(async x => {
        await policy.ExecuteAsync(async () => {
            var workItem = new WorkItem<T> { Data = x };
            await innerBlock.SendAsync(workItem);
            await workItem.Completion;
        });
    });
}

Чтобы проверить это, я создал блок с политикой ожидания и повтора и фиктивным методом, который выдает исключение первые 3 раза, когда он вызывается (для всего приложения). Затем я дал ему некоторые данные:

"a", "b", "c", "d", "e", "f"

Я ожидаю, что a, b и c потерпят неудачу и уйдут в конец строки. Но я заметил, что они попали в действие внутреннего блока в следующем порядке:

"a", "a", "a", "a", "b", "c", "d", "e", "f"

По сути, я не выполнил свои собственные требования, и довольно легко понять почему: внешний блок не пропускает новые элементы, пока не будут выполнены все повторные попытки текущего элемента. Простое, но, казалось бы, хакерское решение - добавить большой MaxDegreeOfParallelism значение для внешнего блока:

return new ActionBlock<T>(async x => {
    await policy.ExecuteAsync(async () => {
        var workItem = new WorkItem<T> { Data = x };
        await innerBlock.SendAsync(workItem);
        await workItem.Completion;
    });
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });

С этим изменением я заметил, что новые вещи действительно появляются до повторных попыток, но я также развязал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда находятся в конце:

"a", "e", "b", "c", "d", "a", "e", "b"

Так что это немного лучше. Но в идеале я бы хотел, чтобы порядок сохранился:

"a", "b", "c", "d", "e", "a", "b", "c"

Это то, где я застрял, и рассуждая об этом, я задаюсь вопросом, возможно ли это даже при этих ограничениях, особенно о том, что внутренности CreatePollyBlock может выполнить политику, но не может определить ее. Если бы эти внутренности могли, например, обеспечить повторную лямбду, я думаю, это дало бы мне гораздо больше возможностей. Но это часть определения политики, и я не могу этого сделать.

Заранее благодарю за любую помощь.

0 ответов

Другие вопросы по тегам