Как я могу показать, что в Java Fork/Join Framework происходит кража работы?
Я хотел бы улучшить мой небольшой пример fork/join, чтобы показать, что во время выполнения платформы Java Fork/Join происходит кража работы.
Какие изменения мне нужно сделать, чтобы следующий код? Цель примера: просто провести линейное исследование распределения значения между несколькими потоками.
package com.stackru.questions;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class CounterFJ<T extends Comparable<T>> extends RecursiveTask<Integer> {
private static final long serialVersionUID = 5075739389907066763L;
private List<T> _list;
private T _test;
private int _lastCount = -1;
private int _start;
private int _end;
private int _divideFactor = 4;
private static final int THRESHOLD = 20;
public CounterFJ(List<T> list, T test, int start, int end, int factor) {
_list = list;
_test = test;
_start = start;
_end = end;
_divideFactor = factor;
}
public CounterFJ(List<T> list, T test, int factor) {
this(list, test, 0, list.size(), factor);
}
@Override
protected Integer compute() {
if (_end - _start < THRESHOLD) {
int count = 0;
for (int i = _start; i < _end; i++) {
if (_list.get(i).compareTo(_test) == 0) {
count++;
}
}
_lastCount = count;
return new Integer(count);
}
LinkedList<CounterFJ<T>> taskList = new LinkedList<>();
int step = (_end - _start) / _divideFactor;
for (int j = 0; j < _divideFactor; j++) {
CounterFJ<T> task = null;
if (j == 0)
task = new CounterFJ<T>(_list, _test, _start, _start + step, _divideFactor);
else if (j == _divideFactor - 1)
task = new CounterFJ<T>(_list, _test, _start + (step * j), _end, _divideFactor);
else
task = new CounterFJ<T>(_list, _test, _start + (step * j), _start + (step * (j + 1)), _divideFactor);
// task.fork();
taskList.add(task);
}
invokeAll(taskList);
_lastCount = 0;
for (CounterFJ<T> task : taskList) {
_lastCount += task.join();
}
return new Integer(_lastCount);
}
public int getResult() {
return _lastCount;
}
public static void main(String[] args) {
LinkedList<Long> list = new LinkedList<Long>();
long range = 200;
Random r = new Random(42);
for (int i = 0; i < 1000; i++) {
list.add(new Long((long) (r.nextDouble() * range)));
}
CounterFJ<Long> counter = new CounterFJ<>(list, new Long(100), 4);
ForkJoinPool pool = new ForkJoinPool();
long time = System.currentTimeMillis();
pool.invoke(counter);
System.out.println("Fork join counter in " + (System.currentTimeMillis() - time));
System.out.println("Occurrences:" + counter.getResult());
}
}
1 ответ
Решение
Наконец я справился, и это не сложно, поэтому я оставляю это для будущих читателей.
В конструкторе RecursiveTask
сохранить поток, который создал сам экземпляр. в compute
Метод проверки, является ли выполнение потока тем же или нет. Если не работа-кража произошла.
Поэтому я добавил эту переменную-член
private long _threadId = -1;
private static int stolen_tasks = 0;
изменил конструктор так:
public CounterFJ(List<T> list, T test, int start, int end, int factor) {
_list = list;
_threadId = Thread.currentThread().getId(); //added
_test = test;
_start = start;
_end = end;
_branchFactor = factor;
}
и добавил сравнение в compute
метод:
@Override
protected Integer compute() {
long thisThreadId = Thread.currentThread().getId();
if (_threadId != thisThreadId){
stolen_tasks++;
}
// rest of the method