Отложенная запись в DataFlow
При публикации элемента в TPL DataFlow
Есть ли какой-нибудь механизм, который может позволить задержку сообщения?
public partial class BasicDataFlowService
{
private readonly ActionBlock<string> workerBlock;
public BasicDataFlowService()
{
workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 32
});
}
partial void DoWork(string fileName);
private void AddToDataFlow(string file)
{
workerBlock.Post(file);
}
}
В AddToDataFlow
Я хотел бы иметь возможность указать задержку перед обработкой элемента (например, если мы решили, что хотим отложить обработку на 30 секунд).
Я подумал об использовании TransFormBlock
с new System.Threading.ManualResetEvent(false).WaitOne(1000);
например,
var requeueBlock = new TransformBlock<string, string>(file =>
{
new System.Threading.ManualResetEvent(false).WaitOne(1000);
return file;
});
requeueBlock.LinkTo(workerBlock);
Однако может показаться, что это излишне потребляет поток, который может использоваться другими блоками в цепочке.
2 ответа
Прежде всего, вам нужно хранить ManualResetEvent
как синглтон, иначе у всех потоков будет свой собственный объект для ожидания, и ваш подход не будет работать.
Во-вторых, рассмотрим ManualResetEventSlim
версия вместо тяжелой ManualResetEvent
, если вам нужно сделать синхронизацию внутри одного AppDomain
в вашем трубопроводе.
Если вы хотите повторно использовать ядра вашей машины без бесполезного ожидания, вы должны изучить SpinWait
легкая конструкция. Статья Джозефа Албахари может оказаться полезной в этом случае:
// singleton variable
bool _proceed;
var requeueBlock = new TransformBlock<string, string>(file =>
{
var spinWait = new SpinWait();
while (!_proceed)
{
// ensure we have the latest _proceed value
Thread.MemoryBarrier();
// try to spin for a while
// after some spins, yield to another thread
spinWait.SpinOnce();
}
return file;
});
SpinWait
внутренне решает, как дать: с Sleep(0)
, Sleep(1)
или же Yield
вызовы методов, так что это довольно эффективно для вашего случая.
Чтобы добавить задержку перед публикацией значения в workerBlock
Вы можете просто вставить задержку и подождать ее, прежде чем публиковать значение. Если твой workerBlock
имеет ограниченную емкость, вы можете await SendAsync
, Несколько вариантов для достижения цели:
private async Task AddToDataflow(string file, TimeSpan delay) {
await Task.Delay(delay);
await workerBlock.SendAsync(file);
}
private async Task AddToDataflow(string file) {
var delay = TimeSpan.FromSeconds(30);
await Task.Delay(delay);
await workerBlock.SendAsync(file);
}
private async void AddToDataflow(string file) {
var delay = TimeSpan.FromSeconds(30);
await Task.Delay(delay);
workerBlock.Post(file);
}