LinkedBlockingQueue возвращает только один из нескольких потоков

Я сделал класс, который считает слова в заданных файлах в том же каталоге. Поскольку файлы очень большие, я решил добиться подсчета нескольких файлов, используя несколько потоков.

При запуске DriverClass, как указано ниже, он застревает в потоке один. Что я делаю неправильно? Поскольку я перебираю queue.take(), можно было бы ожидать, что парсер будет ждать, пока что-то извлечет и пойдет дальше. Застревание в потоке 1 заставляет меня подозревать ошибку при помещении () в очередь.

Заранее спасибо!

DriverClass:

public class WordCountTest {
    public static void main(String[] args){
        if (args.length<1){
            System.out.println("Please specify, atleast, one file");
        }
        BlockingQueue<Integer> threadQueue = new LinkedBlockingQueue<>();
        Runnable r;
        Thread t;

        for (int i = 0; i<args.length; i++){
            r = new WordCount(args[i], threadQueue);
            t = new Thread(r);
            t.start();

            int total = 0;
            for (int k = 0; k<args.length; k++){
                try {
                    total += threadQueue.take();
                } catch (InterruptedException e){
                }
            }
            System.out.println("Total wordcount: " + total);
        }
    }
}

WordCountClass:

public class WordCount implements Runnable {
    private int myId = 0;
    private String _file;
    private BlockingQueue<Integer> _queue;
    private static int id = 0;

    public WordCount(String file, BlockingQueue<Integer> queue){
        _queue = queue;
        _file = file;
        myId = ++id;
    }

    @Override
    public void run() {
        System.out.println("Thread " + myId + " running");
        try {
            _queue.put(countWord(_file));
        } catch (InterruptedException e){

        }
    }

    public int countWord(String file){
        int count = 0;
        try {
            Scanner in = new Scanner(new FileReader(file));
            while (in.hasNext()){
                count++;
                in.next();
            }
        } catch (IOException e){
            System.out.println("File," + file + ",not found");
        }
        return count;
    }
}

2 ответа

Решение

Проблема в том, что вы используете вложенный цикл, когда вы должны использовать два отдельных цикла: один для запуска WordCountsеще один, чтобы собрать результаты, что-то вроде

public class WordCountTest {
    public static void main(String[] args){
        Queue<Integer> threadQueue = new ConcurrentLinkedQueue<>();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        CountDownLatch latch = new CountDownLatch(args.length);

        for (int i = 0; i<args.length; i++){
            CompletableFuture.runAsync(new WordCount(args[i], threadQueue), executor)
                .thenRunAsync(latch.countDown(), executor);
        }

        latch.await();

        int sum = 0;
        for(Integer i : threadQueue) {
            sum += i;
        }
    }
}

Или как бы вы ни хотели реализовать это, суть в том, что вы не должны начинать собирать результаты, пока все WordCounts началось.

Вы ждете всех результатов после запуска первого потока. Возможно, вы намеревались дождаться результатов после запуска всех потоков.

Примечание: если вы создаете больше потоков, чем у вас процессоров, это может быть медленнее. Я предлагаю вместо этого использовать фиксированный пул потоков.

Другие вопросы по тегам