Доступ к файлу через несколько потоков
Я хочу получить доступ к большому файлу (размер файла может варьироваться от 30 МБ до 1 ГБ) через 10 потоков, а затем обработать каждую строку в файле и записать их в другой файл через 10 потоков. Если я использую только один поток для доступа к IO, другие потоки блокируются. Обработка занимает некоторое время, почти эквивалентное чтению строки кода из файловой системы. Существует еще одно ограничение: данные в выходном файле должны быть в том же порядке, что и входной файл.
Я хочу, чтобы ваши мысли о дизайне этой системы. Существует ли существующий API для поддержки одновременного доступа к файлам?
Также запись в тот же файл может привести к тупику.
Пожалуйста, предложите, как этого добиться, если меня беспокоит нехватка времени.
10 ответов
- Вы должны абстрагироваться от чтения файла. Создайте класс, который читает файл и отправляет содержимое в различное количество потоков.
Класс не должен отправлять строки, он должен обернуть их в Line
класс, который содержит метаинформацию, например, номер строки, поскольку вы хотите сохранить исходную последовательность.
Вам нужен класс обработки, который выполняет фактическую работу с собранными данными. В твоем случае работы нет. Класс просто хранит информацию, вы можете когда-нибудь расширить ее, чтобы делать дополнительные вещи (например, перевернуть строку. Добавить некоторые другие строки,...)
Затем вам нужен класс слияния, который выполняет какую-то многопутевую сортировку слиянием в потоках обработки и собирает все ссылки на
Line
экземпляры в последовательности.
Класс слияния может также записывать данные обратно в файл, но для обеспечения чистоты кода...
- Я бы порекомендовал создать выходной класс, который снова абстрагируется от всей обработки файлов и прочего.
Конечно, вам нужно много памяти для этого подхода, если у вас мало оперативной памяти. Вам понадобится потоковый подход, который бы работал на месте, чтобы сохранить небольшую нагрузку на память.
ОБНОВЛЕНИЕ Потоковый подход
Все остается неизменным, кроме:
Reader
поток качает прочитанные данные в Balloon
, Этот шар имеет определенное количество Line
экземпляры, которые он может хранить (чем больше число, тем больше основной памяти вы используете).
Обработка потоков занимает Line
С шарика, читатель качает больше строк в шарик, когда он становится пустее.
Класс слияния берет строки из потоков обработки, как указано выше, и средство записи записывает данные обратно в файл.
Может быть, вы должны использовать FileChannel
в потоках ввода / вывода, поскольку он больше подходит для чтения больших файлов и, вероятно, потребляет меньше памяти при обработке файла (но это всего лишь приблизительное предположение).
Я бы начал с трех потоков.
- поток чтения, который читает данные, разбивает их на "строки" и помещает их в ограниченную очередь блокировки (Q1),
- поток обработки, который читает из Q1, выполняет обработку и помещает их во вторую ограниченную очередь блокировки (Q2), и
- поток записи, который читает из Q2 и записывает на диск.
Конечно, я также хотел бы убедиться, что выходной файл находится на физически другом диске, чем входной файл.
Если обработка, как правило, выполняется медленнее, чем ввод-вывод (отслеживание размеров очереди), вы можете начать экспериментировать с двумя или более параллельными "процессорами", которые синхронизируются в том, как они читают и записывают свои данные.
Любой тип ввода-вывода, будь то диск, сеть и т. Д., Как правило, является узким местом.
Использование нескольких потоков усугубляет проблему, поскольку весьма вероятно, что только один поток может одновременно иметь доступ к ресурсу ввода-вывода.
Лучше всего использовать один поток для чтения, передавать информацию рабочему пулу потоков, а затем писать прямо оттуда. Но опять же, если рабочие пишут в одно и то же место, будут узкие места, поскольку только один может иметь замок. Легко исправить, передав данные в один поток записи.
Короче":
Один поток чтения пишет в BlockingQueue или тому подобное, это дает ему естественную упорядоченную последовательность.
Затем потоки рабочего пула ждут в очереди данные, записывая их порядковый номер.
Затем рабочие потоки записывают обработанные данные в другое BlockingQueue, на этот раз прикрепляя свой оригинальный порядковый номер, чтобы
Поток писателя может взять данные и записать их в последовательности.
Это, вероятно, приведет к максимально быстрой реализации.
Одним из возможных способов будет создание единого потока, который будет читать входной файл и помещать строки чтения в очередь блокировки. Несколько потоков будут ожидать данные из этой очереди, обрабатывать данные.
Другое возможное решение может состоять в том, чтобы разделить файл на блоки и назначить каждый блок отдельному потоку.
Чтобы избежать блокировки, вы можете использовать асинхронный ввод-вывод. Вы также можете взглянуть на шаблон Proactor из Pattern-Oriented Software Architecture, том 2.
Вы можете сделать это, используя FileChannel в Java, который позволяет нескольким потокам обращаться к одному и тому же файлу. FileChannel позволяет читать и писать, начиная с позиции. Смотрите пример кода ниже:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
public class OpenFile implements Runnable
{
private FileChannel _channel;
private FileChannel _writeChannel;
private int _startLocation;
private int _size;
public OpenFile(int loc, int sz, FileChannel chnl, FileChannel write)
{
_startLocation = loc;
_size = sz;
_channel = chnl;
_writeChannel = write;
}
public void run()
{
try
{
System.out.println("Reading the channel: " + _startLocation + ":" + _size);
ByteBuffer buff = ByteBuffer.allocate(_size);
if (_startLocation == 0)
Thread.sleep(100);
_channel.read(buff, _startLocation);
ByteBuffer wbuff = ByteBuffer.wrap(buff.array());
int written = _writeChannel.write(wbuff, _startLocation);
System.out.println("Read the channel: " + buff + ":" + new String(buff.array()) + ":Written:" + written);
}
catch (Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args)
throws Exception
{
FileOutputStream ostr = new FileOutputStream("OutBigFile.dat");
FileInputStream str = new FileInputStream("BigFile.dat");
String b = "Is this written";
//ostr.write(b.getBytes());
FileChannel chnl = str.getChannel();
FileChannel write = ostr.getChannel();
ByteBuffer buff = ByteBuffer.wrap(b.getBytes());
write.write(buff);
Thread t1 = new Thread(new OpenFile(0, 10000, chnl, write));
Thread t2 = new Thread(new OpenFile(10000, 10000, chnl, write));
Thread t3 = new Thread(new OpenFile(20000, 10000, chnl, write));
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
write.force(false);
str.close();
ostr.close();
}
}
В этом примере три потока читают один и тот же файл и пишут в один и тот же файл и не конфликтуют. Эта логика в этом примере не учитывает, что назначенные размеры не обязательно должны заканчиваться концом строки и т. Д. Вы найдете правильную логику на основе ваших данных.
Имейте в виду, что идеальное количество потоков ограничено аппаратной архитектурой и другими компонентами (вы можете подумать о том, чтобы проконсультироваться с пулом потоков, чтобы рассчитать наилучшее количество потоков). Предполагая, что "10" - хорошее число, мы продолжаем. знак равно
Если вы ищете производительность, вы можете сделать следующее:
Прочитайте файл, используя ваши потоки, и обработайте каждый в соответствии с вашим бизнес-правилом. Оставьте одну переменную управления, которая указывает следующую ожидаемую строку, которая будет вставлена в выходной файл.
Если следующая ожидаемая строка завершена, добавьте ее в буфер (очередь) (было бы идеально, если бы вы могли найти способ вставить напрямую в выходной файл, но у вас были бы проблемы с блокировкой). В противном случае сохраните эту "будущую" строку внутри бинарного дерева поиска, упорядочив дерево по позиции строки. Двоичное дерево поиска дает вам временную сложность "O(log n)" для поиска и вставки, что очень быстро для вашего контекста. Продолжайте заполнять дерево, пока не будет завершена обработка следующей "ожидаемой" строки.
Активирует поток, который будет отвечать за открытие выходного файла, периодическое использование буфера и запись строк в файл.
Также следите за "второстепенным" ожидаемым узлом BST, который будет вставлен в файл. Вы можете использовать его, чтобы проверить, находится ли будущая строка внутри BST, прежде чем начать поиск по ней.
- Когда следующая ожидаемая строка будет обработана, вставьте ее в очередь и убедитесь, что следующий элемент находится внутри дерева двоичного поиска. В случае, если следующая строка находится в дереве, удалите узел из дерева и добавьте содержимое узла в очередь и повторите поиск, если следующая строка уже находится внутри дерева.
- Повторяйте эту процедуру, пока все файлы не будут обработаны, дерево пусто и очередь пуста.
Этот подход использует - O(n) для чтения файла (но распараллеливается) - O(1) для вставки упорядоченных строк в очередь - O(Logn)*2 для чтения и записи дерева двоичного поиска - O (о) написать новый файл
плюс стоимость вашего бизнес-правила и операций ввода-вывода.
Надеюсь, поможет.
Я сталкивался с подобной ситуацией и раньше, и то, как я справился с этим, таково:
Прочитайте файл в главном потоке построчно и передайте обработку строки исполнителю. Разумная отправная точка на ExecutorService здесь. Если вы планируете использовать фиксированное количество потоков, вас может заинтересовать Executors.newFixedThreadPool(10)
заводской метод в Executors
учебный класс. Javadocs на эту тему тоже неплох.
По сути, я отправляю все задания, вызываю выключение, а затем в основном потоке продолжаю записывать в выходной файл в порядке для всех Future
которые возвращаются. Вы можете использовать Future
учебный класс' get()
природа метода блокирования, чтобы гарантировать порядок, но вы действительно не должны использовать многопоточность для записи, так же как вы не будете использовать его для чтения. Имеет смысл?
Тем не мение, 1 GB
Дата файлы? Если бы я был тобой, я бы сначала заинтересовался осмысленным разбором этих файлов.
PS: Я сознательно избегал кода в ответе, так как я хотел бы, чтобы ОП попробовал это сам. Достаточно указателей на конкретные классы, методы API и пример.
Весенняя партия приходит на ум.
Поддержание порядка потребует шага постобработки, т.е. сохраните индекс / ключ чтения, упорядоченный в контексте обработки. Логика обработки должна также хранить обработанную информацию в контексте. После завершения обработки вы можете после этого обработать список и записать в файл.,
Остерегайтесь вопросов OOM все же.
Я сталкивался с подобной проблемой в прошлом. Где я должен прочитать данные из одного файла, обработать его и записать результат в другой файл. Так как обработка части была очень тяжелой. Поэтому я попытался использовать несколько потоков. Вот дизайн, которому я следовал, чтобы решить мою проблему:
- Используйте основную программу как мастер, прочитайте весь файл за один раз (но не начинайте обработку). Создайте один объект данных для каждой строки с ее порядком следования.
- Используйте одну приоритетную очередь, скажем очередь в main, добавьте эти объекты данных в нее. Поделитесь ссылкой на эту очередь в конструкторе каждого потока.
- Создайте разные процессоры, т.е. потоки, которые будут прослушивать эту очередь. Когда мы добавляем объекты данных в эту очередь, мы вызываем метод notifyall. Все темы будут обрабатываться индивидуально.
- После обработки поместите все результаты в одну карту и сравните результаты с ключом в качестве порядкового номера.
- Когда очередь пуста и все потоки свободны, значит, обработка завершена. Остановите темы. Итерировать по карте и записывать результаты в файл
Поскольку порядок должен поддерживаться, сама проблема говорит о том, что чтение и запись не могут выполняться параллельно, поскольку это последовательный процесс, единственное, что вы можете сделать параллельно, - это обработка записей, но это также не решает много проблем только с одним писателем.,
Вот дизайнерское предложение:
- Используйте One Thread t1 для чтения файла и сохранения данных в LinkedBlockingQueue Q1
- Используйте другой поток t2, чтобы прочитать данные из Q1 и поместить в другой LinkedBlockingQueue Q2
- Поток t3 читает данные из Q2 и записывает в файл.
- Чтобы убедиться, что вы не столкнулись с OutofMemoryError, вы должны инициализировать очереди с соответствующим размером
- Вы можете использовать CyclicBarrier, чтобы гарантировать, что все потоки завершают свою работу
- Кроме того, вы можете установить действие в CyclicBarrier, где вы можете выполнять свои задачи постобработки.
Удачи, надеясь, что вы получите лучший дизайн.
Ура!!