Использование Polly с потоком данных TPL
Трубопроводы обработки данных и временная обработка ошибок, кажется, идут рука об руку, поэтому мне интересно узнать, смогу ли я получить 2 из лучших библиотек для них - TPL Dataflow и Polly, соответственно - для приятного взаимодействия.
В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock
, В идеале я хотел бы заключить его в метод создания блока с такой подписью:
ITargetBlock<T> CreatePollyBlock<T>(
Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)
Было бы достаточно просто policy.Execute
действие изнутри ActionBlock
, но у меня есть эти 2 требования:
- В случае повторной попытки я не хочу, чтобы повторный элемент имел приоритет над другими элементами, находящимися в очереди. Другими словами, когда вы терпите неудачу, вы переходите на задний план.
- Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало вход новых элементов. И если
ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
установлено, я не хочу, чтобы элементы, ожидающие повторения, "посчитали" против этого максимума.
Чтобы удовлетворить эти требования, я считаю, что мне нужен "внутренний" ActionBlock
с предоставленным пользователем ExecutionDataflowBlockOptions
Применяется, и некоторый "внешний" блок, который отправляет элементы во внутренний блок и применяет любую логику ожидания и повтора (или то, что диктует политика) вне контекста внутреннего блока. Вот моя первая попытка:
// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();
public T Data { get; set; }
public Task Completion => _tcs.Task;
public void SetCompleted() => _tcs.SetResult(0);
public void SetFailed(Exception ex) => _tcs.SetException(ex);
}
ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
// create a block that marks WorkItems completed, and allows
// items to fault without faulting the entire block.
var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
try {
act(wi.Data);
wi.SetCompleted();
}
catch (Exception ex) {
wi.SetFailed(ex);
}
}, opts);
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
});
}
Чтобы проверить это, я создал блок с политикой ожидания и повтора и фиктивным методом, который выдает исключение первые 3 раза, когда он вызывается (для всего приложения). Затем я дал ему некоторые данные:
"a", "b", "c", "d", "e", "f"
Я ожидаю, что a, b и c потерпят неудачу и уйдут в конец строки. Но я заметил, что они попали в действие внутреннего блока в следующем порядке:
"a", "a", "a", "a", "b", "c", "d", "e", "f"
По сути, я не выполнил свои собственные требования, и довольно легко понять почему: внешний блок не пропускает новые элементы, пока не будут выполнены все повторные попытки текущего элемента. Простое, но, казалось бы, хакерское решение - добавить большой MaxDegreeOfParallelism
значение для внешнего блока:
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
С этим изменением я заметил, что новые вещи действительно появляются до повторных попыток, но я также развязал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда находятся в конце:
"a", "e", "b", "c", "d", "a", "e", "b"
Так что это немного лучше. Но в идеале я бы хотел, чтобы порядок сохранился:
"a", "b", "c", "d", "e", "a", "b", "c"
Это то, где я застрял, и рассуждая об этом, я задаюсь вопросом, возможно ли это даже при этих ограничениях, особенно о том, что внутренности CreatePollyBlock
может выполнить политику, но не может определить ее. Если бы эти внутренности могли, например, обеспечить повторную лямбду, я думаю, это дало бы мне гораздо больше возможностей. Но это часть определения политики, и я не могу этого сделать.
Заранее благодарю за любую помощь.