От чего зависит количество потоков, создаваемых Java ForkJoinPool?
Насколько я понял ForkJoinPool
этот пул создает фиксированное количество потоков (по умолчанию: число ядер) и никогда не будет создавать больше потоков (если приложение не указывает на необходимость в них с помощью managedBlock
).
Однако, используя ForkJoinPool.getPoolSize()
Я обнаружил, что в программе, которая создает 30000 задач (RecursiveAction
), ForkJoinPool
при выполнении этих задач в среднем используется 700 потоков (число потоков учитывается при каждом создании задачи). Задачи не делают ввод / вывод, но чистые вычисления; единственная синхронизация между задачами вызывает ForkJoinTask.join()
и доступ AtomicBoolean
s, т.е. нет операций по блокировке потоков.
поскольку join()
не блокирует вызывающий поток, как я понимаю, нет причины, по которой какой-либо поток в пуле должен когда-либо блокироваться, и поэтому (я предполагал) не должно быть никаких причин для создания каких-либо дополнительных потоков (что, очевидно, происходит, тем не менее).
Итак, почему ForkJoinPool
создать столько потоков? Какие факторы определяют количество созданных потоков?
Я надеялся, что на этот вопрос можно будет ответить без публикации кода, но здесь он приходит по запросу. Этот код является выдержкой из программы, в четыре раза превышающей размер, уменьшенный до основных частей; он не компилируется как есть. При желании я, конечно, могу выложить полную программу тоже.
Программа ищет в лабиринте путь от заданной начальной точки до заданной конечной точки, используя поиск в глубину. Решение гарантированно существует. Основная логика в compute()
метод SolverTask
: A RecursiveAction
который начинается в некоторой заданной точке и продолжается со всеми соседними точками, достижимыми из текущей точки. Вместо того, чтобы создавать новый SolverTask
в каждой точке ветвления (которая создала бы слишком много задач) он выталкивает всех соседей, кроме одного, в стек обратного отслеживания для последующей обработки и продолжает работу только с одним соседом, не помещенным в стек. Как только он таким образом достигает тупика, точка, недавно добавленная к стеку обратного отслеживания, выталкивается, и поиск продолжается оттуда (соответственно сокращая путь, построенный из начальной точки taks). Новая задача создается, когда задача обнаруживает, что ее стек отслеживания больше определенного порога; с этого времени задача, продолжая извлекаться из своего стека обратного отслеживания до тех пор, пока она не будет исчерпана, не будет выдвигать дальнейшие точки в свой стек при достижении точки ветвления, но будет создавать новую задачу для каждой такой точки. Таким образом, размер задач может быть скорректирован с использованием порогового предела стека.
Числа, которые я цитировал выше ("30000 задач, в среднем 700 потоков") взяты из поиска в лабиринте 5000x5000 ячеек. Итак, вот основной код:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
// Is this task still accepting new branches for processing on its own,
// or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); // Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
// Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
// Collect current's unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
// contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
// Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
// Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
// Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
// Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
// Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
// Current node is a branch point, but this task does not accept new branches any more:
// Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
// Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { // subtask found solution
pathFromLocalStart.addAll(subTaskResult);
// No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} // for subTasks
} // else (not accepting any more branches)
} // if (current node is a branch)
// Current node is dead end or all its neighbors lead to dead ends:
// Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; // No more backtracking avaible: No solution exists => end of this task
}
// Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
// DEBUG System.out.println("Backtracking to " + branchingPoint);
// Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
// DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
// continue while loop with newly popped current
} // while (current ...
if (!current.equals(end)) {
// this task was interrupted by another one that already found the solution
// and should end now therefore:
return null;
} else {
// Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} // compute()
} // class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
// Start initial task
long startTime = System.currentTimeMillis();
// since SolverTask.compute() expects its starting point already visited,
// must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
// One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
4 ответа
Есть связанные вопросы по stackru:
ForkJoinPool останавливается во время invokeAll / join
ForkJoinPool, кажется, тратить нить
Я сделал сокращенную версию того, что происходит (я использовал аргументы jvm: -Xms256m -Xmx1024m -Xss8m):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class Test1 {
private static ForkJoinPool pool = new ForkJoinPool(2);
private static class SomeAction extends RecursiveAction {
private int counter; //recursive counter
private int childrenCount=80;//amount of children to spawn
private int idx; // just for displaying
private SomeAction(int counter, int idx) {
this.counter = counter;
this.idx = idx;
}
@Override
protected void compute() {
System.out.println(
"counter=" + counter + "." + idx +
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
if (counter <= 0) return;
List<SomeAction> list = new ArrayList<>(childrenCount);
for (int i=0;i<childrenCount;i++){
SomeAction next = new SomeAction(counter-1,i);
list.add(next);
next.fork();
}
for (SomeAction action:list){
action.join();
}
}
}
public static void main(String[] args) throws Exception{
pool.invoke(new SomeAction(2,0));
}
}
Очевидно, что когда вы выполняете соединение, текущий поток видит, что требуемая задача еще не выполнена, и выполняет для себя другую задачу.
Это происходит в java.util.concurrent.ForkJoinWorkerThread#joinTask
,
Однако эта новая задача порождает больше таких же задач, но они не могут найти потоки в пуле, потому что потоки заблокированы в соединении. И поскольку он не может знать, сколько времени потребуется для их освобождения (поток может быть в бесконечном цикле или заблокирован навсегда), новый (-ие) поток (-ы) порождается (Компенсирует объединенные потоки, как упоминал Луи Вассерман)): java.util.concurrent.ForkJoinPool#signalWork
Поэтому, чтобы предотвратить такой сценарий, вам нужно избегать рекурсивного нереста задач.
Например, если в приведенном выше коде вы установили начальный параметр равным 1, количество активных потоков будет равно 2, даже если вы увеличите childrenCount в десять раз.
Также обратите внимание, что, хотя количество активных потоков увеличивается, количество запущенных потоков меньше или равно параллелизму.
Из исходных комментариев:
Компенсация: если уже нет достаточного количества активных потоков, метод tryPreBlock() может создать или повторно активировать запасной поток, чтобы компенсировать заблокированные присоединения, пока они не разблокируются.
Я думаю, что происходит то, что вы не выполняете ни одну из задач очень быстро, и поскольку при отправке новой задачи нет доступных рабочих потоков, создается новый поток.
Строгий, полный-строгий и конечно-строгий имеет отношение к обработке ориентированного ациклического графа (DAG). Вы можете погуглить эти термины, чтобы получить полное представление о них. Это тип обработки, который был разработан для обработки. Посмотрите на код в API для Recursive..., фреймворк использует ваш код compute() для создания других ссылок compute(), а затем выполняет join(). Каждая задача выполняет одно соединение (), как при обработке группы DAG.
Вы не делаете обработку DAG. Вы создаете много новых задач и ждете (join()) для каждой. Прочитайте в исходном коде. Это ужасно сложно, но вы можете понять это. Фреймворк не делает правильного управления задачами. Куда он помещает ожидающую задачу, когда выполняет join()? Нет приостановленной очереди, которая потребовала бы, чтобы поток монитора постоянно просматривал очередь, чтобы увидеть, что закончено. Вот почему фреймворк использует "потоки продолжения". Когда одна задача действительно присоединяется (), фреймворк предполагает, что она ожидает завершения одной более низкой Задачи. Когда присутствует много методов join (), поток не может продолжаться, поэтому должен существовать вспомогательный поток или поток продолжения.
Как отмечалось выше, вам нужен процесс fork-join типа scatter-collect. Там вы можете раскошелиться как можно больше задач
Оба фрагмента кода, опубликованные Holger Peine, и elusive-code самом деле не соответствуют рекомендуемой практике, которая появилась в javadoc для версии 1.8:
В наиболее типичных случаях пара fork-join действует как вызов (fork) и return (join) из параллельной рекурсивной функции. Как и в случае с другими формами рекурсивных вызовов, возврат (объединение) должен выполняться в первую очередь. Например, a.fork (); b.fork (); b.join (); a.join (); вероятно, будет существенно более эффективным, чем объединение кода a перед кодом b.
В обоих случаях FJPool был создан с помощью конструктора по умолчанию. Это приводит к созданию пула с asyncMode = false, который по умолчанию:
@param asyncMode если true,
устанавливает локальный режим планирования "первым пришел - первым вышел" для разветвленных задач, которые никогда не объединяются. Этот режим может быть более подходящим, чем режим по умолчанию для локального стека, в приложениях, в которых рабочие потоки обрабатывают только асинхронные задачи в стиле событий. Для значения по умолчанию используйте false.
таким образом, рабочая очередь на самом деле является лифтом:
голова -> | t4 | T3 | t2 | t1 | ... | <- хвост
Таким образом, во фрагментах они выполняют fork () все задачи, помещая их в стек, а затем join () в том же порядке, то есть от самой глубокой задачи (t1) до самой верхней (t4), эффективно блокирующей, пока какой-то другой поток не украдет (t1), затем (t2) и так далее. Поскольку существует множество задач для блокировки всех потоков пула (task_count >> pool.getParallelism()), компенсация начинается, как описал Louis Wasserman.
Стоит отметить, что вывод кода, отправленного elusive-code, зависит от версии java. Запустив код в java 8, я вижу результат:
...
counter=0.73 activeThreads=45 runningThreads=5 poolSize=49 queuedTasks=105 queuedSubmissions=0 parallelism=2 stealCount=3056
counter=0.75 activeThreads=46 runningThreads=1 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3158
counter=0.77 activeThreads=47 runningThreads=3 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3157
counter=0.74 activeThreads=45 runningThreads=3 poolSize=51 queuedTasks=5 queuedSubmissions=0 parallelism=2 stealCount=3153
Но при запуске того же кода в java 11 вывод будет другим:
...
counter=0.75 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=4 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.76 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=3 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.77 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=2 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.78 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=1 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.79 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=0