Как писать многопоточный код и накапливать выходные данные всех потоков в одном файле

Этот вопрос чем-то похож на Создание существующего кода в Java параллельным / многопоточным

Я не нашел конкретного ответа на свой вопрос, поэтому я публикую ниже.

Я пытаюсь сделать существующее приложение многопоточным, чтобы уменьшить время выполнения.

Вот моя (для краткости) упрощенная версия существующего кода приложения

for(MyClass myObject : myObjectSet) {              
    String outputString=myLongRunningMethod(myObject);   
    fileWriter.append(outputString+"\n");
}

fileWriter.close();    

Здесь я попытался сделать его многопоточным. myLongRunningMethod самая медленная часть

ExecutorService threadExec = Executors.newFixedThreadPool(myObjectSet.size());

// Семафорная реализация ResourcePool resourcePool = new ResourcePool(myObjectSet.size());

for(MyClass myObject:myObjectSet) {             
       Object key = resourcePool.getItem();      

       MyClassMT myClassMT = new MyClassMT(myObject);
       threadExec.execute(myClassMT);                      
 }

Где MyClassMT находится:

public class MyClassMT implements Runnable{

    MyClass myObject;

    public MyClassMT(MyClass myObject) {
        this.myObject=myObject
    }

    @Override
    public void run() {
       String outString= myLongRunningMethod(this.myObject);
       System.out.println(outString);   
    }
}

Проблема / Вопрос

Моя попытка многопоточного кода, кажется, работает нормально, потому что я вижу ожидаемый вывод в консоли, но я не могу более безопасно записать вывод myLongRunningMethod подать с помощью fileWriter, Кроме того, я могу видеть в профилировщике, что все потоки все еще живы даже после того, как они сделали обработку myObject

Как записать вывод в файл после обработки всех элементов в myObjectSet. то есть, чтобы вернуть мою первоначальную функциональность многопоточным способом. А потом остановите все темы.

Есть ли более легкая / лучшая реализация? Может быть, тот, который включает в себя фьючерсы гуавы? Мне действительно нужно использовать MyResource, который похож на код здесь Семафор

Кстати, я пытался установить outString как поле MyClassMT и попытался вернуть его в код после того, как сразу послеthreadExec.execute(myClassMT);, это не сработало.

Не показано в коде, новый fileWriter будет создан для каждого myObjectSet.

Пожалуйста, дайте мне знать, если вам нужно больше информации об этом.

2 ответа

Решение

Как записать вывод в файл после обработки всех элементов в myObjectSet. то есть, чтобы вернуть мою первоначальную функциональность многопоточным способом.

С использованием Future<String> это правильный способ сделать это. Вам нужно включить MyClassMT в Callable<String> и использовать threadExec.submit(myCallable), Это возвращает Future<String> который после отправки всех ваших задач может быть использован для получения результатов работы каждого потока.

public class MyClassMT implements Callable<String> {
    ...
    public String call() {
       ...
    }

Вы также можете использовать threadExec.invokeAll(...) призвать все ваши Callable<String> классы. Это возвращает List<Future<String>>,

Тогда вы можете сделать что-то вроде:

List<Future<String>> futures = threadExec.invokeAll(myClassMTCollection);
// always shutdown the pool once you are done submitting
threadExec.shutdown();
for (Future<String> future : futures) {
    // this can throw an exception that the thread threw
    String result = future.get();
}

А потом остановите все темы.

После того как вы отправили все задания, вам нужно позвонить shutdown() в бассейне. Отправленные задания продолжают выполняться, но после выполнения заданий потоки будут отключены. Если вы этого не сделаете, ваше приложение никогда не закончится.

ExecutorService threadExec = Executors.newFixedThreadPool (myObjectSet.size ());

Если вы делаете что-то подобное, то вам действительно следует использовать Executors.newCachedThreadPool() который будет разветвлять новый поток всякий раз, когда это необходимо. Действительно, если ваши потоки интенсивно используют процессор, вы должны выбрать какое-то число из числа ядер с фиксированным пулом потоков, а не выделять новый поток для каждой задачи.

Я бы сказал, что это просто еще один вариант стандартной (множественной) проблемы производителя (-ов)-потребителя в многопоточности. Есть много разных решений для такого рода проблем, я бы предпочел подход с очередью сообщений в этом случае - но это только мое личное предпочтение.

Другие вопросы по тегам