Как использовать TaskCompletionSource
У меня ситуация с несколькими производителями и одним потребителем. Я выбрал общий thread-safe
ресурс, в котором все производители Enqueue
Предметы. Однако я не знаю, как эффективно сделать производителя await
для новых предметов, когда Dequeue
из этого ресурса.
ПОКО
struct Sample
{
public int Id { get; set; }
public double Value { get; set; }
}
Производители
class ProducerGroup
{
StorageQueue sink;
int producerGroupSize;
public ProducerGroup(StorageQueue _sink,int producers)
{
this.sink = _sink;
this.producerGroupSize = producers;
}
public void StartProducers()
{
Task[] producers = new Task[producerGroupSize];
int currentProducer;
for (int i = 0; i < producerGroupSize; i++)
{
currentProducer = i;
producers[i] = Task.Run(async () =>
{
int cycle = 0;
while (true)
{
if (cycle > 5)
{
cycle = 0;
}
Sample localSample = new Sample { Id = currentProducer, Value = cycle++ };
await Task.Delay(1000);
this.sink.Enqueue(localSample);
}
});
}
}
}
Место хранения
class StorageQueue
{
private TaskCompletionSource<Sample> tcs;
private object @lock = new object();
private Queue<Sample> queue;
public static StorageQueue CreateQueue(int?size=null)
{
return new StorageQueue(size);
}
public StorageQueue(int ?size)
{
if (size.HasValue)
{
this.queue = new Queue<Sample>(size.Value);
}
else
this.queue = new Queue<Sample>();
}
public void Enqueue(Sample value)
{
lock (this.@lock)
{
this.queue.Enqueue(value);
tcs = new TaskCompletionSource<Sample>();
tcs.SetResult(this.queue.Peek());
}
}
public async Task<Sample> DequeueAsync()
{
var result=await this.tcs.Task;
this.queue.Dequeue();
return result;
}
}
потребитель
class Consumer
{
private StorageQueue source;
public Consumer(StorageQueue _source)
{
this.source = _source;
}
public async Task ConsumeAsync()
{
while (true)
{
Sample arrivedSample = await this.source.DequeueAsync(); //how should i implement this ?
Console.WriteLine("Just Arrived");
}
}
}
Как вы можете видеть в Consumer
класс я хотел бы обернуть Storage's method
Dequeuein a
задачаso that i can
Ждитеit in my
потребитель.
The only reason i used
TaskCompletionSourceis to be able to communicate between the
Dequeueand
Ставитьmethods in the
Storage`.
Я не знаю, нужно ли мне повторно инициализировать tcs
но я полагаю, что я делаю, так как я хочу новый Task
после каждого Enqueue
операция.
Я также повторно инициализировал tcs
внутри lock
так как я хочу, чтобы этот конкретный экземпляр установил результат.
Как мне поступить с этим? Реализация в порядке? Было бы System.Reactive
предложить лучший вариант?
1 ответ
Я думаю, что вижу несколько проблем с вашей реализацией:
- Если вы позвоните
Enqueue()
несколько раз без звонкаDequeueAsync()
, вы потеряетеTaskCompletionSource
и иметь только последний. Затем, позвонивDequeueAsync()
не даст правильных результатов после первого звонка.
Чтобы это исправить, вам понадобится очередь из TaskCompletionSource
это для этого. Посмотрите здесь и дальше здесь.
Лучше использовать SemaphoreSlim.WaitAsync() в DequeueAsync
правильно ждать, если очередь пуста.
- Вы также не защищаете свою очередь в
DequeueAsync()
,