Мониторинг состояния работников пула объектов

Я создал эту ветку из другого класса для чтения статуса исполнителей на финише и отмены остальных задач в случае неудачи. Задачи выполняются

Если наблюдается какой-либо сбой, общее состояние должно быть 1 или сбой

final CompletionService completionService = new ExecutorCompletionService(getExecutorService());
final List<Future> futures = new ArrayList<Future>();

    FutureTask<Integer> tasks = new FutureTask<Integer>(new Callable<Integer>() {

        public Integer call() {

            int status = 0;
            boolean fail = false;

            try {
                for (int i = 0; i < 10; i++) {

                    MyRunnable resultObj = null;

                    try {
                        resultObj = (MyRunnable) completionService.take().get();
                    } catch (CancellationException e) {
                        // Skip it ..
                    }

                    if (!fail) {
                        status = resultObj.getStatus();

                        if (status == 1) {
                            fail = true;
                            for (Future future : futures) {
                                if (!future.isCancelled() && !future.isDone())
                                    future.cancel(true); // cancel pending tasks including running tasks 
                            }
                        }
                    }
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }

            return status;
        }

            });

Выше тема запущена -

ExecutorService pool = Executors.newSingleThreadExecutor();
pool.submit(tasks);

Внизу Object заимствован из пула, и это блокирующий вызов, и я установил размер пула в 3, поэтому изначально 3 рабочих MyRunnable создаются немедленно. Когда каждый работник заканчивает работу, его снова используют для выполнения остальных задач.

for (int i = 0 ; i < 10; i ++;) {

    MyRunnable myRunnable = null;
    myRunnable = (MyRunnable) this.getGenericObjectPool().borrowObject();

    set myRunnable ..

    futures.add(completionService.submit(myRunnable, myRunnable));

}

while (!tasks.isDone()) {

        try {
            Thread.sleep(Global.WaitTime());            
        } catch (InterruptedException iex) {            
        }

}

finalStatus = tasks.get();
pool.shutdown();

GenericObjectPool настроен для повторного использования объектов. Я смоделировал тест в IDE, вынудив первый поток завершиться с ошибкой и установив его состояние на 1. Но проблема в том, что как только он был выпущен, он был повторно использован loanObject(), и поток мониторинга увидел измененный объект, статус которого был установлен возвращается к 0 как часть активации нового объекта, которая выполняется GenricObjectPool.

Итак, я не могу прочитать статус из ветки с ошибкой. MyRunnable не Callable, поэтому мне пришлось обмануть Runnable, используя завершение Service.submit(obj,obj)

Эта проблема не произойдет, если размер пула будет равен 10 или более, потому что тогда ни один объект не будет использован повторно, и я успешно прочитаю состояние каждого из них, но это не вариант.

1 ответ

Решение

Я создал CallableDecorator для Runnable, чтобы исправить это. Теперь у меня есть правильное возвращаемое значение, даже используя GenericObjectPool. Поскольку в настоящее время нет зависимости от объекта Pool для чтения статуса, даже повторное использование объекта не приводит к сбросу статуса -

Итак, 2 изменения в коде - Изменение

futures.add(completionService.submit(myRunnable, myRunnable));

в

futures.add(completionService.submit(new CallableDecorator(myRunnable)));

добавить новый класс

public class CallableDecorator implements Callable {

       IRunnable r;

       public CallableDecorator(IRunnable r) {

           this.r = r;
       }

       public Integer call() {

           r.run();
           return r.statusCode();
       }
}

interface IRunnable extends Runnable {
     public Integer statusCode();
}

Аналогично, resultObj должен быть изменен на целочисленный для получения его значения в потоке монитора.

Другие вопросы по тегам