Отложенная запись в DataFlow

При публикации элемента в TPL DataFlowЕсть ли какой-нибудь механизм, который может позволить задержку сообщения?

public partial class BasicDataFlowService
{
    private readonly ActionBlock<string> workerBlock;

    public BasicDataFlowService()
    {
        workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 32
        });
    }

    partial void DoWork(string fileName);

    private void AddToDataFlow(string file)
    {
        workerBlock.Post(file);
    }
}

В AddToDataFlowЯ хотел бы иметь возможность указать задержку перед обработкой элемента (например, если мы решили, что хотим отложить обработку на 30 секунд).

Я подумал об использовании TransFormBlock с new System.Threading.ManualResetEvent(false).WaitOne(1000);например,

var requeueBlock = new TransformBlock<string, string>(file =>
{
    new System.Threading.ManualResetEvent(false).WaitOne(1000);
    return file;
});

requeueBlock.LinkTo(workerBlock);

Однако может показаться, что это излишне потребляет поток, который может использоваться другими блоками в цепочке.

2 ответа

Прежде всего, вам нужно хранить ManualResetEvent как синглтон, иначе у всех потоков будет свой собственный объект для ожидания, и ваш подход не будет работать.

Во-вторых, рассмотрим ManualResetEventSlim версия вместо тяжелой ManualResetEvent, если вам нужно сделать синхронизацию внутри одного AppDomain в вашем трубопроводе.

Если вы хотите повторно использовать ядра вашей машины без бесполезного ожидания, вы должны изучить SpinWait легкая конструкция. Статья Джозефа Албахари может оказаться полезной в этом случае:

// singleton variable
bool _proceed;

var requeueBlock = new TransformBlock<string, string>(file =>
{
    var spinWait = new SpinWait();
    while (!_proceed)
    {
        // ensure we have the latest _proceed value
        Thread.MemoryBarrier();
        // try to spin for a while
        // after some spins, yield to another thread
        spinWait.SpinOnce();
    }
    return file;
});

SpinWaitвнутренне решает, как дать: с Sleep(0), Sleep(1) или же Yield вызовы методов, так что это довольно эффективно для вашего случая.

Чтобы добавить задержку перед публикацией значения в workerBlock Вы можете просто вставить задержку и подождать ее, прежде чем публиковать значение. Если твой workerBlock имеет ограниченную емкость, вы можете await SendAsync, Несколько вариантов для достижения цели:

private async Task AddToDataflow(string file, TimeSpan delay) {
    await Task.Delay(delay);
    await workerBlock.SendAsync(file);
}

private async Task AddToDataflow(string file) {
    var delay = TimeSpan.FromSeconds(30);
    await Task.Delay(delay);
    await workerBlock.SendAsync(file);
}

private async void AddToDataflow(string file) {
    var delay = TimeSpan.FromSeconds(30);
    await Task.Delay(delay);
    workerBlock.Post(file);
}
Другие вопросы по тегам