Как реализовать PriorityBlockingQueue с ThreadPoolExecutor и пользовательские задачи
Я много искал, но не смог найти решение своей проблемы.
У меня есть свой собственный класс, BaseTask
, который использует ThreadPoolExecutor
для решения задач.
Если я не хочу расставлять приоритеты (т.е. используя LinkedBlockingQueue
) это работает просто отлично, но когда я пытаюсь использовать PriorityBlockingQueue
я получил ClassCastException
поскольку ThreadPoolExecutor
оборачивает мои задачи в FutureTask
объект.
Это явно нормально, потому что FutureTask
не реализует Comparable
, но как бы я решил проблему с приоритетом?
Я читал, что вы можете переопределить newTaskFor
в ThreadPoolExecutor
, но я не могу найти этот метод вообще...?
Любые предложения будут высоко ценится!
Некоторый код, чтобы помочь:
В моем BaseTask
класс у меня есть
private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);
private final BaseFutureTask<Result> mFuture;
public BaseTask(int priority) {
mFuture = new BaseFutureTask<Result>(mWorker, priority);
}
public final BaseTask<Params, Progress, Result> execute(Params... params) {
/* Some unimportant code here */
sExecutor.execute(mFuture);
}
В BaseFutureTask
учебный класс
@Override
public int compareTo(BaseFutureTask another) {
long diff = this.priority - another.priority;
return Long.signum(diff);
}
В BaseThreadPoolExecutor
класс я переопределить 3 submit
методы...
Конструктор в этом классе вызывается, но ни один из submit
методы
6 ответов
public class ExecutorPriority {
public static void main(String[] args) {
PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());
Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
exe.execute(new RunWithPriority(2) {
@Override
public void run() {
System.out.println(this.getPriority() + " started");
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println(this.getPriority() + " finished");
}
});
exe.execute(new RunWithPriority(10) {
@Override
public void run() {
System.out.println(this.getPriority() + " started");
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println(this.getPriority() + " finished");
}
});
}
private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {
@Override
public int compare(T o1, T o2) {
return o1.getPriority().compareTo(o2.getPriority());
}
}
}
как вы можете догадаться, RunWithPriority - это абстрактный класс, который является Runnable и имеет поле приоритета Integer
Вы можете использовать эти вспомогательные классы:
public class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get();
}
public void run() {
src.run();
}
public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
};
}
А ТАКЖЕ
public interface PriorityCallable<T> extends Callable<T> {
int getPriority();
}
И этот вспомогательный метод:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
}
};
}
И затем используйте это так:
class LenthyJob implements PriorityCallable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor exec = getPriorityExecutor(2);
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
Я попытаюсь объяснить эту проблему с помощью полнофункционального кода. Но прежде чем углубиться в код, я хотел бы рассказать о PriorityBlockingQueue.
PriorityBlockingQueue: PriorityBlockingQueue является реализацией BlockingQueue. Он принимает задачи вместе с их приоритетом и передает задачу с наивысшим приоритетом для выполнения в первую очередь. Если какие-либо две задачи имеют одинаковый приоритет, то нам нужно предоставить некоторую настраиваемую логику, чтобы решить, какая задача идет первой.
Теперь давайте сразу перейдем к коду.
Класс драйвера: этот класс создает исполнителя, который принимает задачи, а затем отправляет их на выполнение. Здесь мы создаем две задачи, одна с низким приоритетом, а другая с высоким приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потоков и использовать PriorityBlockingQueue.
public static void main(String[] args) {
/*
Minimum number of threads that must be running : 0
Maximium number of threads that can be created : 1
If a thread is idle, then the minimum time to keep it alive : 1000
Which queue to use : PriorityBlockingQueue
*/
PriorityBlockingQueue queue = new PriorityBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
1000, TimeUnit.MILLISECONDS,queue);
MyTask task = new MyTask(Priority.LOW,"Low");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.HIGH,"High");
executor.execute(new MyFutureTask(task));
}
Класс MyTask: MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача выполняется, она печатает сообщение, а затем переводит поток в спящий режим на 1 секунду.
public class MyTask implements Runnable {
public int getPriority() {
return priority.getValue();
}
private Priority priority;
public String getName() {
return name;
}
private String name;
public MyTask(Priority priority,String name){
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("The following Runnable is getting executed "+getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Класс MyFutureTask: поскольку мы используем PriorityBlocingQueue для хранения наших задач, наши задачи должны быть заключены в FutureTask, а наша реализация FutureTask должна реализовывать интерфейс Comparable. Интерфейс Comparable сравнивает приоритет 2 разных задач и отправляет задачу с наивысшим приоритетом на выполнение.
public class MyFutureTask extends FutureTask<MyFutureTask>
implements Comparable<MyFutureTask> {
private MyTask task = null;
public MyFutureTask(MyTask task){
super(task,null);
this.task = task;
}
@Override
public int compareTo(MyFutureTask another) {
return task.getPriority() - another.task.getPriority();
}
}
Приоритетный класс: Самоочевидный Приоритетный класс.
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4);
int value;
Priority(int val) {
this.value = val;
}
public int getValue(){
return value;
}
}
Теперь, когда мы запустим этот пример, мы получим следующий вывод
The following Runnable is getting executed High
The following Runnable is getting executed Low
Несмотря на то, что мы сначала отправили приоритет LOW, но позже задачу HIGH priority, но поскольку мы используем PriorityBlockingQueue, сначала будет выполняться задача с более высоким приоритетом.
Мое решение:
public class XThreadPoolExecutor extends ThreadPoolExecutor
{
public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
{
return new ComparableFutureTask<>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
return new ComparableFutureTask<>(callable);
}
protected class ComparableFutureTask<V>
extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
{
private Object object;
public ComparableFutureTask(Callable<V> callable)
{
super(callable);
object = callable;
}
public ComparableFutureTask(Runnable runnable, V result)
{
super(runnable, result);
object = runnable;
}
@Override
@SuppressWarnings("unchecked")
public int compareTo(ComparableFutureTask<V> o)
{
if (this == o)
{
return 0;
}
if (o == null)
{
return -1; // high priority
}
if (object != null && o.object != null)
{
if (object.getClass().equals(o.object.getClass()))
{
if (object instanceof Comparable)
{
return ((Comparable) object).compareTo(o.object);
}
}
}
return 0;
}
}
}
Этот ответ представляет собой упрощенную версию ответа @StanislavVitvitskyy. Спасибо ему.
Я хотел, чтобы вакансии, которые я отправил, былиComparable
. Я создалExecutorService
с PriorityBlockingQueue
и расширьте его для обработки newTaskFor(...)
методы:
ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, timeUnit, new PriorityBlockingQueue<Runnable>()) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<T>(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new ComparableFutureTask<T>(callable);
};
};
Я определил ComparableFutureTask
который расширяет FutureTask
и реализует Comparable
делегируя job.compareTo(...)
которые отправляются в пул.
public class ComparableFutureTask<T> extends FutureTask<T>
implements Comparable<Object> {
private final Comparable<Object> comparableJob;
@SuppressWarnings("unchecked")
public ComparableFutureTask(Runnable runnable, T value) {
super(runnable, value);
this.comparableJob = (Comparable<Object>) runnable;
}
@SuppressWarnings("unchecked")
public ComparableFutureTask(Callable<T> callable) {
super(callable);
this.comparableJob = (Comparable<Object>) callable;
}
@Override
public int compareTo(Object o) {
return this.comparableJob
.compareTo(((ComparableFutureTask<?>) o).comparable);
}
}
Эта ExecutorService
тогда может справиться Runnable
или Callable
рабочие места, которые также Comparable
. Например:
public class MyJob implements Runnable, Comparable<MyJob> {
private int priority;
...
@Override
public int compareTo(MyJob other) {
// we want higher priority to go first
return other.priority - this.priority;
}
...
}
Важно отметить, что если вы отправляете вакансию, Comparable
в эту очередь он бросит ClassCastException
.
Похоже, они оставили это вне гармонии апача. Примерно год назад есть журнал svn commit, фиксирующий отсутствие newTaskFor
, Вы можете просто переопределить submit
функции в расширенном ThreadPoolExecutor
создать расширенный FutureTask
то есть Comparable
, Они не очень длинные.
Чтобы ответить на ваш вопрос: newTaskFor()
метод находится в ThreadPoolExecutor
суперкласс, AbstractExecutorService
, Вы можете просто переопределить его в ThreadPoolExecutor
, тем не мение.