Может ли PLINQ генерировать два потока? Поток ошибок и поток данных
У меня есть большой (700 МБ +) файл CSV, который я обрабатываю с помощью PLINQ. Вот запрос:
var q = from r in ReadRow(src).AsParallel()
where BoolParser.Parse(r[vacancyIdx])
select r[apnIdx];
Он генерирует список APN для свободных объектов, если вам интересно.
У меня вопрос, как я могу извлечь поток "плохих записей", не делая 2 прохода на запрос / поток?
Каждая строка в файле CSV должна содержать записи colCount. Я хотел бы обеспечить это, изменив предложение where на "где r.Count == colCount && BoolParser.Parse(r[vacancyIdx])".
Но затем любой искаженный ввод будет тихо исчезать.
Мне нужно захватить любые некорректные строки в журнале ошибок и отметить, что n строк ввода не были обработаны.
В настоящее время я делаю эту работу в функции ReadRow(), но кажется, что должен быть простой способ разделить поток данных на 2 или более потоков для обработки.
Кто-нибудь знает, как это сделать? Если нет, кто-нибудь знает, как добавить это предложение в запросы новых функций PLINQ?;-)
1 ответ
То, что вы просите, не имеет особого смысла, потому что PLINQ основан на модели "тянуть" (то есть потребитель решает, когда потреблять товар). Рассмотрим код наподобие (для краткости используем синтаксис кортежа C# 7):
var (good, bad) = ReadRow(src).AsParallel().Split(r => r.Count == colCount);
foreach (var item in bad)
{
// do something
}
foreach (var item in good)
{
// do something else
}
Реализация Split
имеет два варианта:
Блокировать один поток, когда текущий элемент принадлежит другому потоку.
В приведенном выше примере это приведет к взаимоблокировке, как только появится первый хороший элемент.
Кэшируйте значения одного потока во время чтения другого потока.
В приведенном выше примере, если предположить, что подавляющее большинство элементов являются хорошими, это приведет к тому, что около 700 МБ ваших данных будут храниться в памяти в момент между двумя
foreach
петли. Так что это тоже нежелательно.
Итак, я думаю, что ваше решение сделать это в ReadRow
хорошо.
Другой вариант будет что-то вроде:
where CheckCount(r) && BoolParser.Parse(r[vacancyIdx])
Здесь CheckCount
Метод сообщает о найденных ошибках и возвращает false
для них. (Если вы сделаете это, убедитесь, что отчетность защищена от потоков.)
Если вы все еще хотите предложить добавить что-то подобное в PLINQ или просто обсудить варианты, вы можете создать проблему в репозитории corefx.