Создание источника асинхронного потока
У меня есть дорогой метод для вызова пакета исходных элементов:
private Task<List<SourceItem>> GetUnprocessedBatch(int batchSize)
{
//impl
}
Я хочу заполнять новые элементы только тогда, когда нет элементов для обработки (или они опускаются ниже определенного порога). Я не мог понять, какой метод Source использовать до сих пор.
Я реализовал грубый поток, который будет возвращать новые элементы:
public class Stream
{
private readonly Queue<SourceItem> scrapeAttempts;
private int batchSize = 100;
private int minItemCount = 10;
public Stream()
{
scrapeAttempts = new Queue<SourceItem>();
}
public async Task<SourceItem> Next()
{
if (scrapeAttempts.Count < minItemCount)
{
var entryScrapeAttempts = await GetUnprocessedBatch(batchSize);
entryScrapeAttempts.ForEach(attempt => scrapeAttempts.Enqueue(attempt));
}
return scrapeAttempts.Dequeue();
}
}
Я ожидал Source.Task
будет работать, но, похоже, это вызывает только один раз. Как я могу создать источник для этого сценария?
1 ответ
Итак, концептуально, что вам нужно, это этап Source, который асинхронно выбирает элементы в пакетах, буферизует пакет и распространяет события вниз по течению один за другим. Когда буфер близок к тому, чтобы быть пустым, мы хотим быстро вызвать следующую выборку в боковом потоке (но не более одного раза), чтобы он мог завершиться, пока мы очищаем текущий пакет.
Такое поведение потребует создания собственного GraphStage. Тот, который может выглядеть так:
sealed class PreFetch<T> : GraphStage<SourceShape<T>>
{
private readonly int threshold;
private readonly Func<Task<IEnumerable<T>>> fetch;
private readonly Outlet<T> outlet = new Outlet<T>("prefetch");
public PreFetch(int threshold, Func<Task<IEnumerable<T>>> fetch)
{
this.threshold = threshold;
this.fetch = fetch;
this.Shape = new SourceShape<T>(this.outlet);
}
public override SourceShape<T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
private sealed class Logic : GraphStageLogic
{
public Logic(PreFetch<T> stage) : base(stage.Shape)
{
// queue for batched elements
var queue = new Queue<T>();
// flag which indicates, that pull from downstream was made,
// but we didn't have any elements at that moment
var wasPulled = false;
// determines if fetch was already called
var fetchInProgress = false;
// in order to cooperate with async calls without data races,
// we need to register async callbacks for success and failure scenarios
var onSuccess = this.GetAsyncCallback<IEnumerable<T>>(batch =>
{
foreach (var item in batch) queue.Enqueue(item);
if (wasPulled)
{
// if pull was requested but not fulfilled, we need to push now, as we have elements
// it assumes that fetch returned non-empty batch
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
}
fetchInProgress = false;
});
var onFailure = this.GetAsyncCallback<Exception>(this.FailStage);
SetHandler(stage.outlet, onPull: () => {
if (queue.Count < stage.threshold && !fetchInProgress)
{
// if queue occupation reached bellow expected capacity
// call fetch on a side thread and handle its result asynchronously
stage.fetch().ContinueWith(task =>
{
// depending on if task was failed or not, we call corresponding callback
if (task.IsFaulted || task.IsCanceled)
onFailure(task.Exception as Exception ?? new TaskCanceledException(task));
else onSuccess(task.Result);
});
fetchInProgress = true;
}
// if queue is empty, we cannot push immediatelly, so we only mark
// that pull request has been made but not fulfilled
if (queue.Count == 0)
wasPulled = true;
else
{
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
}
});
}
}
}