Как я могу показать, что в Java Fork/Join Framework происходит кража работы?

Я хотел бы улучшить мой небольшой пример fork/join, чтобы показать, что во время выполнения платформы Java Fork/Join происходит кража работы.

Какие изменения мне нужно сделать, чтобы следующий код? Цель примера: просто провести линейное исследование распределения значения между несколькими потоками.

package com.stackru.questions;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class CounterFJ<T extends Comparable<T>> extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 5075739389907066763L;
    private List<T> _list;
    private T _test;
    private int _lastCount = -1;
    private int _start;
    private int _end;
    private int _divideFactor = 4;

    private static final int THRESHOLD = 20;

    public CounterFJ(List<T> list, T test, int start, int end, int factor) {
        _list = list;
        _test = test;
        _start = start;
        _end = end;
        _divideFactor = factor;

    }

    public CounterFJ(List<T> list, T test, int factor) {
        this(list, test, 0, list.size(), factor);
    }

    @Override
    protected Integer compute() {

        if (_end - _start < THRESHOLD) {
            int count = 0;
            for (int i = _start; i < _end; i++) {
                if (_list.get(i).compareTo(_test) == 0) {
                    count++;
                }
            }
            _lastCount = count;
            return new Integer(count);
        }
        LinkedList<CounterFJ<T>> taskList = new LinkedList<>();

        int step = (_end - _start) / _divideFactor;
        for (int j = 0; j < _divideFactor; j++) {
            CounterFJ<T> task = null;
            if (j == 0)
                task = new CounterFJ<T>(_list, _test, _start, _start + step, _divideFactor);
            else if (j == _divideFactor - 1)
                task = new CounterFJ<T>(_list, _test, _start + (step * j), _end, _divideFactor);
            else
                task = new CounterFJ<T>(_list, _test, _start + (step * j), _start + (step * (j + 1)), _divideFactor);

            // task.fork();
            taskList.add(task);
        }
        invokeAll(taskList);

        _lastCount = 0;
        for (CounterFJ<T> task : taskList) {
            _lastCount += task.join();
        }

        return new Integer(_lastCount);

    }

    public int getResult() {
        return _lastCount;
    }

    public static void main(String[] args) {

        LinkedList<Long> list = new LinkedList<Long>();
        long range = 200;
        Random r = new Random(42);

        for (int i = 0; i < 1000; i++) {
            list.add(new Long((long) (r.nextDouble() * range)));
        }

        CounterFJ<Long> counter = new CounterFJ<>(list, new Long(100), 4);

        ForkJoinPool pool = new ForkJoinPool();
        long time = System.currentTimeMillis();
        pool.invoke(counter);

        System.out.println("Fork join counter in " + (System.currentTimeMillis() - time));
        System.out.println("Occurrences:" + counter.getResult());

    }

}

1 ответ

Решение

Наконец я справился, и это не сложно, поэтому я оставляю это для будущих читателей.

В конструкторе RecursiveTask сохранить поток, который создал сам экземпляр. в compute Метод проверки, является ли выполнение потока тем же или нет. Если не работа-кража произошла.

Поэтому я добавил эту переменную-член

private long _threadId = -1;
private static int stolen_tasks = 0;

изменил конструктор так:

public CounterFJ(List<T> list, T test, int start, int end, int factor) {
        _list = list;
        _threadId = Thread.currentThread().getId(); //added
        _test = test;
        _start = start;
        _end = end;
        _branchFactor = factor;
}

и добавил сравнение в compute метод:

   @Override
   protected Integer compute() {

      long thisThreadId = Thread.currentThread().getId();
      if (_threadId != thisThreadId){
          stolen_tasks++;
      }
   // rest of the method
Другие вопросы по тегам