Блок потока данных TPL использует всю доступную память
У меня есть TransformManyBlock
со следующим дизайном:
- Ввод: путь к файлу
- Вывод: IEnumerable содержимого файла, по одной строке за раз
Я запускаю этот блок для огромного файла (61 ГБ), который слишком велик для размещения в оперативной памяти. Чтобы избежать неограниченного роста памяти, я установил BoundedCapacity
очень низкому значению (например, 1) для этого блока и всех последующих блоков. Тем не менее, блок явно перебирает IEnumerable, который потребляет всю доступную память на компьютере, останавливая каждый процесс. OutputCount блока продолжает расти без ограничений, пока я не убью процесс.
Что я могу сделать, чтобы блок не потреблял IEnumerable
в этом случае?
РЕДАКТИРОВАТЬ: Вот пример программы, которая иллюстрирует проблему:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
Если вы используете 64-разрядную версию, обязательно снимите флажок "Предпочитать 32-разрядную версию" в Visual Studio. На моем компьютере 16 ГБ ОЗУ, и эта программа сразу же потребляет все доступные байты.
1 ответ
Вы, кажется, неправильно понимаете, как работает поток данных TPL.
BoundedCapacity
ограничивает количество элементов, которые вы можете разместить в блоке. В вашем случае это означает, что один char
в TransformManyBlock
и один string
в ActionBlock
,
Таким образом, вы размещаете один элемент в TransformManyBlock
который затем возвращает 1024*1024
строки и пытается передать их ActionBlock
который будет принимать только по одному за раз. Остальные струны будут просто сидеть в TransformManyBlock
выходная очередь.
Вероятно, вы захотите создать отдельный блок и разместить в нем элементы в потоковом режиме, ожидая (синхронно или иным образом), когда будет достигнута его емкость:
private static void Main()
{
MainAsync().Wait();
}
private static async Task MainAsync()
{
var block = new ActionBlock<string>(async item =>
{
Console.WriteLine(item.Substring(0, 10));
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
foreach (var item in GetSequence('A'))
{
await block.SendAsync(item);
}
block.Complete();
await block.Completion;
}
Кажется, что для создания ограниченного вывода TransformManyBlock
, необходимы три внутренних блока:
- А
TransformBlock
который получает ввод и производитIEnumerable
s, потенциально работающие параллельно. - Непараллельный
ActionBlock
который перечисляет произведенныеIEnumerable
s, и распространяет окончательные результаты. - А
BufferBlock
где хранятся окончательные результаты, соблюдая желаемыеBoundedCapacity
.
Немного сложная часть заключается в том, как распространить завершение второго блока, потому что он не связан напрямую с третьим блоком. В реализации ниже методPropagateCompletion
написан по исходному коду библиотеки.
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, Task<IEnumerable<TOutput>>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
dataflowBlockOptions);
var output = new BufferBlock<TOutput>(dataflowBlockOptions);
var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
{
if (results == null) return;
foreach (var result in results)
{
var accepted = await output.SendAsync(result).ConfigureAwait(false);
if (!accepted) break; // If one is rejected, the rest will be rejected too
}
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
CancellationToken = dataflowBlockOptions.CancellationToken,
SingleProducerConstrained = true,
});
input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
PropagateCompletion(middle, output);
return DataflowBlock.Encapsulate(input, output);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try
{
await source.Completion.ConfigureAwait(false);
}
catch { }
var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}
}
// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
item => Task.FromResult(transform(item)), dataflowBlockOptions);
}
Пример использования:
var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
c => GetSequence(c), options);
Если коэффициент вывода конвейера ниже, чем коэффициент передачи, сообщения будут накапливаться в конвейере до тех пор, пока не закончится память или не будет достигнут какой-либо предел очереди. Если сообщения имеют значительный размер, процесс скоро будет нуждаться в памяти.
настройка BoundedCapacity
значение 1 приведет к отклонению сообщений в очереди, если в очереди уже есть одно сообщение. Это нежелательное поведение в таких случаях, как, например, пакетная обработка. Проверьте этот пост для понимания.
Этот рабочий тест иллюстрирует мою точку зрения:
//Change BoundedCapacity to +1 to see it fail
[TestMethod]
public void stackOverflow()
{
var total = 1000;
var processed = 0;
var block = new ActionBlock<int>(
(messageUnit) =>
{
Thread.Sleep(10);
Trace.WriteLine($"{messageUnit}");
processed++;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 }
);
for (int i = 0; i < total; i++)
{
var result = block.SendAsync(i);
Assert.IsTrue(result.IsCompleted, $"failed for {i}");
}
block.Complete();
block.Completion.Wait();
Assert.AreEqual(total, processed);
}
Поэтому мой подход заключается в том, чтобы задушить сообщение, чтобы конвейер не накапливал много сообщений в очередях.
Ниже приведен простой способ сделать это. Таким образом, поток данных продолжает обрабатывать сообщения на полной скорости, но сообщения не накапливаются, что позволяет избежать чрезмерного потребления памяти.
//Should be adjusted for specific use.
public void postAssync(Message message)
{
while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
{
Thread.Sleep(200);
//Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace.
//This is the perfect place to force garbage collector to release memory.
}
block1.SendAssync(message)
}