Основной поток случайно не достигает конца (пытаясь суммировать натуральные числа в параллельных потоках)

У меня есть следующий код, чтобы найти сумму натуральных чисел от 1 до 5000. Это простое упражнение для практики параллелизма.

public static void main(String[] args) throws InterruptedException {
        final int[] threadNb = new int[] {5};
        final Integer[] result = new Integer[1];
        result[0] = 0;
        List<Thread> threads = new LinkedList<>();
        IntStream.range(0, threadNb[0]).forEach(e -> {
            threads.add(new Thread(() -> {
                int sum = 0;
                int idx = e * 1000 + 1;
                while (!Thread.interrupted()) {
                    if (idx <= (e + 1) * 1000) {
                        sum += idx++;
                    } else {
                        synchronized(result) {
                            result[0] += sum;
                            System.err.println("sum found (job " + e + "); sum=" + sum + "; result[0]=" + result[0] + "; idx=" + idx);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                synchronized(result) {
                    System.err.println("Job " + e + " done. threadNb = " + threadNb[0]);
                    threadNb[0]--;
                    System.err.println("threadNb = " + threadNb[0]);
                }
            }));
        });
        threads.forEach(Thread::start);
        //noinspection StatementWithEmptyBody
        while(threadNb[0] > 0);
        System.out.println("begin result");
        System.out.println(result[0]);
        System.out.println("end result");
    }

Иногда, когда я запускаю код, последние 3 System.out.println() не отображаются. Если я положу заявление в while(threadNb[0] > 0)как другой System.out.println()Моя проблема никогда не повторится.

Может кто-нибудь объяснить мне это поведение?

Заранее благодарю за любую помощь

2 ответа

Решение

Ничто в том, как объявляется переменная threadNb, не говорит JVM, что она должна делать обновления, видимые для других потоков. В какой момент обновления вашей переменной становятся видимыми для других потоков, полностью зависит от реализации JVM, она может сделать их видимыми или нет в зависимости от обстоятельств. Кроме того, JIT может свободно переупорядочивать или оптимизировать код, если считает, что ему это сойдет с рук, и основывает свои решения на правилах видимости. Поэтому сложно сказать, что именно происходит здесь, потому что поведение не определено спецификациями языка Java, но определенно у вас возникла проблема, когда обновления ваших рабочих потоков обычно не замечаются основным потоком.

Если вы замените массивы на AtomicInteger, то обновления гарантированно станут видимыми для других потоков. (Volatile тоже работает, но AtomicInteger предпочтительнее. Чтобы использовать volatile, необходимо сделать переменную экземпляром или членом класса.) Если вы сохраняете обновленные значения в локальной переменной, синхронизация не требуется:

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;    
public class SumNumbers {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger threadNb = new AtomicInteger(5);
        AtomicInteger result = new AtomicInteger(0);
        List<Thread> threads = new LinkedList<>();
        IntStream.range(0, threadNb.intValue()).forEach(e -> {
            threads.add(new Thread(() -> {
                int sum = 0;
                int idx = e * 1000 + 1;
                while (!Thread.currentThread().isInterrupted()) {
                    if (idx <= (e + 1) * 1000) {
                        sum += idx++;
                    } else {
                        int r = result.addAndGet(sum);
                        System.out.println("sum found (job " + e + "); sum=" 
                        + sum + "; result=" + r 
                        + "; idx=" + idx);
                        Thread.currentThread().interrupt();
                    }
                }
                System.out.println("Job " + e + " done.");
                int threadNbVal = threadNb.decrementAndGet();
                System.out.println("Job " + e + " done, threadNb = " + threadNbVal);
            }));
        });
        threads.forEach(Thread::start);
        //noinspection StatementWithEmptyBody
        while(threadNb.intValue() > 0);
        System.out.println("result=" + result.intValue());
    }
}

где вы можете увидеть обновления становятся видимыми.

Оживленное ожидание не является предпочтительным, поскольку оно тратит время процессора. Вы видите это в программировании без блокировки, но это не очень хорошая вещь здесь. Thread#join будет работать или вы можете использовать CountdownLatch.

Помните, что используя Thread#interrupted() очищает прерванный флаг Обычно он используется, когда вы собираетесь выбросить InterruptedException, в противном случае лучше использовать Thread.currentThread().isInterrupted(), В этом конкретном случае это не повредит, потому что проверка цикла while - единственное, что использует флаг, поэтому не имеет значения, очищен ли он.

Наиболее вероятным объяснением является то, что компилятор оптимизирует ваш код так, чтобы значение threadNb[0] кэшируется Поэтому основной поток может не видеть обновления, сделанные другими потоками. Делать ваш счетчик volatile может помочь решить эту проблему.

Тем не менее, текущий подход с занятым ожиданием, как правило, не является лучшим решением. Вы должны использовать join() метод класса Thread чтобы позволить вашему главному потоку подождать, пока не закончится каждый из них.

Например:

for(Thread t: threads) {
    try{
        t.join();
    } catch(InterruptedException e) {}
}
Другие вопросы по тегам