Java Executors: как мне установить приоритет задачи?

Есть ли возможность установить приоритет задач, которые выполняются исполнителями? Я нашел некоторые утверждения в JCIP о том, что это возможно, но я не могу найти ни одного примера и не могу найти ничего связанного с документами.

От JCIP:

Политика выполнения определяет "что, где, когда и как" выполнения задачи, включая:

  • ...
  • В каком порядке должны выполняться задачи (FIFO, LIFO, приоритетный порядок)?
  • ...

UPD: я понял, что спросил не совсем то, что хотел спросить. Что я действительно хотел, так это:

Как использовать / эмулировать установку приоритетов потоков (то есть что было thread.setPriority()а) с исполнителями рамок?

8 ответов

Решение

В настоящее время единственными конкретными реализациями интерфейса Executor являются ThreadPoolExecutor и ScheduledThreadpoolExecutor

Вместо использования утилиты / фабрики класса Executors, вы должны создать экземпляр, используя конструктор.

Вы можете передать BlockingQueue конструкторам ThreadPoolExecutor.

Одна из реализаций BlockingQueue, PriorityBlockingQueue, позволяет передавать компаратор конструктору, что позволяет вам определять порядок выполнения.

Идея в том, чтобы использовать PriorityBlockingQueue в исполнителе. За это:

  • Создайте компаратор, который бы сравнивал наше будущее.
  • Создайте прокси на будущее, чтобы иметь приоритет.
  • Переопределите 'newTaskFor', чтобы обернуть каждое будущее в наш прокси.

Прежде всего, вам нужно уделить приоритетное внимание своему будущему:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Далее необходимо определить компаратор, который бы правильно сортировал приоритетные фьючерсы:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Теперь давайте предположим, что у нас длинная работа, подобная этой:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Тогда для выполнения этих заданий в приоритетном порядке код будет выглядеть так:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Это много кода, но это почти единственный способ сделать это.

На моей машине вывод выглядит примерно так:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

Вы можете реализовать свой собственный ThreadFactory и установить его в ThreadPoolExecutor следующим образом:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

где мой OpJobThreadFactory выглядит следующим образом:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}

Вы можете указать ThreadFactory в ThreadPoolExecutor конструктор (или Executors заводской метод). Это позволяет вам предоставлять потоки с заданным приоритетом для исполнителя.

Чтобы получить разные приоритеты потоков для разных заданий, вам необходимо отправить их исполнителям с разными фабриками потоков.

Вы можете использовать ThreadPoolExecutor с очередью блокировки приоритетов. Как реализовать PriorityBlockingQueue с ThreadPoolExecutor и пользовательские задачи

Помните, что setPriority(..) обычно не работает под Linux. Смотрите следующие ссылки для получения полной информации:

Просто хочу добавить свой вклад в это обсуждение. Я реализовал этот ReorderingThreadPoolExecutor для очень конкретной цели, которая заключается в возможности явного вывода на передний план BlockingQueue исполнителя (в данном случае LinkedBlockingDeque) всякий раз, когда я хочу и без необходимости иметь дело с приоритетами (которые могут привести к тупикам и Во всяком случае, исправлено).

Я использую это для управления (внутри приложения Android) случаем, когда мне нужно загрузить много изображений, которые отображаются в виде длинного списка. Всякий раз, когда пользователь быстро прокручивает страницу вниз, очередь исполнителей заполняется запросами на загрузку изображений: перемещая последние в верхнюю часть очереди, я добивался гораздо лучших результатов при загрузке изображений, которые на самом деле находятся на экране, что задерживает загрузку те, которые, вероятно, понадобятся позже. Обратите внимание, что я использую внутренний ключ одновременной карты (который может быть таким же простым, как строка URL-адреса изображения), чтобы добавить задачи исполнителю, чтобы я мог получить их позже для переупорядочения.

Было бы много других способов сделать то же самое, и, может быть, это слишком сложно, но это работает хорошо, и Facebook в своем Android SDK делает нечто подобное в своей очереди рабочих потоков.

Не стесняйтесь взглянуть на код и дать мне предложения, он находится внутри проекта Android, но удаление нескольких журналов и аннотаций сделало бы класс чистой Java 6.

Если это просто попытка отдать предпочтение одному потоку над другим, а не гарантировать какой-либо порядок. В Callable вы передаете установленный приоритет потока в начале метода call ():

      private int priority;

MyCallable(int priority){
this.priority=priority;
}

public String call() {

     logger.info("running callable with priority {}", priority);
     Thread.currentThread().setPriority(priority);

// do stuff

     return "something";
}

все еще зависит от базовой реализации, чтобы соблюдать приоритет потока, хотя

Другие вопросы по тегам