RecursiveTask Throwing StackruError При выполнении ForkJoin

Я разработал RecursiveTask

Вот код для задачи, которую я разработал.

public class SearchTask extends RecursiveTask<Map<Short, Long>> {

private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;

public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
        final boolean fromInclusive, final boolean toInclusive) {
    this.dataMap = new ConcurrentSkipListMap<>(dataSource);
    this.fromRange = fromRange;
    this.toRange = toRange;
    this.fromInclusive = fromInclusive;
    this.toInclusive = toInclusive;
}

@Override
protected Map<Short, Long> compute() {
    final int size = dataMap.size();
    // This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackru error when map filled with 32k data
    if (size > majorDataThreshold+1000) {
        // List<SearchTask> tasks = createSubtasks();
        // tasks.get(0).fork();
        // tasks.get(1).fork();

        // Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
        // map.putAll(tasks.get(1).join());
        // return map;

        return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
                .flatMap(map -> map.entrySet().stream())
                .collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
    }
    return search();
}

private List<SearchTask> createSubtasks() {
    final short lastKey = dataMap.lastKey();
    final short midkey = (short) (lastKey / 2);
    final short firstKey = dataMap.firstKey();
    final List<SearchTask> dividedTasks = new ArrayList<>();
    dividedTasks.add(
            new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
                    fromRange, toRange, fromInclusive, toInclusive));
    dividedTasks
            .add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
                    fromRange, toRange, fromInclusive, toInclusive));
    return dividedTasks;
}

private Map<Short, Long> search() {
    final Map<Short, Long> result = dataMap.entrySet().stream()
            .filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
            .collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
    return result;
}

private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
        final boolean fromInclusive, final boolean toInclusive) {
    if (fromInclusive && !toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
    else if (!fromInclusive && toInclusive)
        return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
    else if (fromInclusive && toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
    else
        return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}

Максимальное количество обработанных данных для этой задачи - 32000 (32 КБ).

В коде я делю задачи, если он проходит порог

 if (size > majorDataThreshold)

Когда я пытаюсь уменьшить значение majorDataThreshold меньше 16001, я получаю сообщение об ошибке

Трассировки стека

at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackruError
    ... 1024 more
Caused by: java.lang.StackruError
    ... 1024 more
    .................Same trace
Caused by: java.lang.StackruError
    at java.util.Collection.stream(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
    at java.util.concurrent.RecursiveTask.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)

Чтобы решить это я пытался использовать

Collectors.toMap()
ConcurrentHashMap
Join Manually

Проблема не решена

Может ли кто-нибудь помочь мне найти, что не так в моем RecursiveTaskзадача.

Код модульного теста

public class Container32kUniqueDataTest {

private ForkJoinRangeContainer forkJoinContianer;

@Before
public void setUp(){
    long[] data = genrateTestData();
    forkJoinContianer = new ForkJoinRangeContainer(data)
}

private long[] genrateTestData(){
    long[] data= new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i]=i+1;
    }
    return data;
}

@Test
public void runARangeQuery_forkJoin(){
    Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
    assertEquals(true, ids.size()>0);
}
}   

Обезжиренная версия кода контейнера

public class ForkJoinRangeContainer {

private Map<Short, Long> dataSource = new HashMap<Short, Long>();

public ForkJoinRangeContainer(long[] data) {
    populateData(data);
}

private void populateData(final long[] data) {
    for (short i = 0; i < data.length; i++) {
        dataSource.put(i, data[i]);
    }
}

public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
    Map<Short, Long> map = forkJoinPool.invoke(task);
    forkJoinPool.shutdown();
    return map.keySet();
}

public static void main(String[] args) {

    long[] data = new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i] = i + 1;
    }
    ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
    Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
    if (ids.size() > 0) {
        System.out.println("Found Ids");
    }
}

1 ответ

Вы застряли в бесконечном цикле в SearchTask return ForkJoinTask.invokeAll(createSubtasks())

CreateSubtasks () снова и снова создает подзадачи с одинаковыми значениями, поскольку вы никогда не уменьшаете размер dataMap.

F/J работает, разбивая объект на Левый и Правый. Каждый Левый и Правый создает новые Левый и Правый с половиной их значения. Это деление пополам продолжается до порога, когда вы "делаете работу".

Первым уроком в программировании, который я когда-либо усвоил, было Keep It Simple.

Вы смешиваете Map, ArrayMap, ConcurrentSkipListMap, ConcurrentNavigableMap, List, stream.Collectors, HashMap и Set вместе с классами F/J. Наиболее запутанно, что делает его очень сложным для следования и обычно приводит к провалу. Просто лучше.

Когда вы создаете список для ForkJoinTask.invokeAll(), создайте список за один раз до вызова invoke(). Список должен содержать все подзадачи, необходимые для выполнения работы, каждая подзадача вдвое меньше предыдущей. Не используйте поток; у вас нет потока, только несколько подзадач в списке.

Либо так, либо разделите Left и Right и выполните Left.fork() Right.fork(). Затем каждое раздвоенное задание снова делится с половиной значения и т. Д.

Точно, как уменьшить объект dataMap "размер, чтобы разделить" зависит от вас.

Другие вопросы по тегам