Как создать цепочку / слой Phasers

Я пишу многопоточное приложение, которое использует Phaser, чтобы знать, когда закончить работу. Проблема в том, что в ExecutorCompletionService может быть даже 100 тыс. Потоков в очереди, но максимальное число незапланированных сторон в Phaser равно 65535. Что я могу сделать, когда прибывает 65536 человек?

Мой пример кода:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

Проблема в методе addParties(). Одиночный поток (SimpleParser) может возвращать, т. Е. 100 новых имен файлов, и в ExecutorCompletionService будет отправлено 100 новых потоков, а в Phaser будет зарегистрировано 100 новых участников. Я пытался использовать что-то вроде этого:

if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

и создать цепочку фазеров, но это не помогло, потому что p.getUnarrivedParties() возвращает 0, но я не могу зарегистрировать следующую партию для него...

    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

печатает:

65535

0

и выбрасывает IllegalStateException

Итак, как я могу создать новый Phaser, который будет связан с этим старым?

//редактировать

Спасибо, @bowmore. У меня есть только два вопроса.

Давайте посмотрим на пример:

import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

Это печатает:

Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

Почему после child1.arriveAndDeregister(); child1 не завершен, и как проверить, если это на самом деле?

Второй вопрос Я спросил о создании нового Phaser после 65535 вечеринок, потому что я думал, что создавать тысячи новых объектов бесполезно - как вы думаете, не будет ли проблем с памятью с этим, или, возможно, это может даже улучшить производительность?

1 ответ

Вместо регистрации с существующим Phaser новые процессы могут регистрироваться на вновь созданном дочернем элементе Phaser оригинала. Создание ребенка Phaser это делается просто путем предоставления родителя Phaser конструктору ребенка.

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

Если вы хотите создавать дочерние Фазеры только при достижении определенного порога, вы можете проверить количество зарегистрированных партий, а не количество необработанных:

public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

РЕДАКТИРОВАТЬ:

Чтобы ответить на вопрос 1: ребенок Phasers делят свое состояние завершения с корнем Phaserвот реализация isTerminated()

public boolean isTerminated() {
    return root.state < 0L;
}

В продолжение вопроса 2: родительские Phasers на самом деле не хранят ссылки на свои дочерние Phasers. Как только на дочерний фазер больше не ссылаются, он получает право на сборку мусора. Вы бы просто лучше следовали советам, которые есть в javadoc:

Наилучшее значение TASKS_PER_PHASER зависит главным образом от ожидаемых скоростей синхронизации. Значение, равное четырем, может быть подходящим для очень маленьких задачных фаз (таким образом, высокие показатели) или до сотен для очень больших.

Основная причина для многоуровневого управления - снижение количества конфликтов, связанных с синхронизацией, поэтому, если у вас есть задачи небольшого веса, лучше использовать меньше задач на фазер. Это никогда не повредит профилировать различные настройки, чтобы настроить эти вещи.

Другие вопросы по тегам