LinkedBlockingQueue возвращает только один из нескольких потоков
Я сделал класс, который считает слова в заданных файлах в том же каталоге. Поскольку файлы очень большие, я решил добиться подсчета нескольких файлов, используя несколько потоков.
При запуске DriverClass, как указано ниже, он застревает в потоке один. Что я делаю неправильно? Поскольку я перебираю queue.take(), можно было бы ожидать, что парсер будет ждать, пока что-то извлечет и пойдет дальше. Застревание в потоке 1 заставляет меня подозревать ошибку при помещении () в очередь.
Заранее спасибо!
DriverClass:
public class WordCountTest {
public static void main(String[] args){
if (args.length<1){
System.out.println("Please specify, atleast, one file");
}
BlockingQueue<Integer> threadQueue = new LinkedBlockingQueue<>();
Runnable r;
Thread t;
for (int i = 0; i<args.length; i++){
r = new WordCount(args[i], threadQueue);
t = new Thread(r);
t.start();
int total = 0;
for (int k = 0; k<args.length; k++){
try {
total += threadQueue.take();
} catch (InterruptedException e){
}
}
System.out.println("Total wordcount: " + total);
}
}
}
WordCountClass:
public class WordCount implements Runnable {
private int myId = 0;
private String _file;
private BlockingQueue<Integer> _queue;
private static int id = 0;
public WordCount(String file, BlockingQueue<Integer> queue){
_queue = queue;
_file = file;
myId = ++id;
}
@Override
public void run() {
System.out.println("Thread " + myId + " running");
try {
_queue.put(countWord(_file));
} catch (InterruptedException e){
}
}
public int countWord(String file){
int count = 0;
try {
Scanner in = new Scanner(new FileReader(file));
while (in.hasNext()){
count++;
in.next();
}
} catch (IOException e){
System.out.println("File," + file + ",not found");
}
return count;
}
}
2 ответа
Проблема в том, что вы используете вложенный цикл, когда вы должны использовать два отдельных цикла: один для запуска WordCounts
еще один, чтобы собрать результаты, что-то вроде
public class WordCountTest {
public static void main(String[] args){
Queue<Integer> threadQueue = new ConcurrentLinkedQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CountDownLatch latch = new CountDownLatch(args.length);
for (int i = 0; i<args.length; i++){
CompletableFuture.runAsync(new WordCount(args[i], threadQueue), executor)
.thenRunAsync(latch.countDown(), executor);
}
latch.await();
int sum = 0;
for(Integer i : threadQueue) {
sum += i;
}
}
}
Или как бы вы ни хотели реализовать это, суть в том, что вы не должны начинать собирать результаты, пока все WordCounts
началось.
Вы ждете всех результатов после запуска первого потока. Возможно, вы намеревались дождаться результатов после запуска всех потоков.
Примечание: если вы создаете больше потоков, чем у вас процессоров, это может быть медленнее. Я предлагаю вместо этого использовать фиксированный пул потоков.