Потоки ThreadPoolExecutor, конфликтующие с другими потоками

Я работаю над улучшением для существующего приложения Java. Приложение представляет собой процессор сообщений, который ежедневно обрабатывает несколько миллионов сообщений. Он в основном написан с использованием Core Java с потоками, а очереди реализованы с использованием классов Collection.

В этом приложении некоторые типы сообщений выполняются в одном потоке. Передо мной была поставлена ​​задача сделать эту конкретную часть приложения многопоточной, чтобы быстрее обрабатывать сообщения, поскольку у нас есть два процессора.

Поскольку мы используем Java 5, я использовал подход ThreadPoolExcecutor. Я создал процессорные потоки для каждого клиента, чтобы сообщения для определенных потоков могли обрабатываться в своем собственном потоке. Потоки процессора реализуют интерфейс Callable, поскольку это позволит мне проверить будущий объект, завершено ли предыдущее задание или нет.

В процессе инициализации я перейду по всем клиентам и создам потоки процессора для каждого из них и сохраню их в карте, используя их идентификатор в качестве уникального ключа. Чтобы отслеживать ранее отправленную работу, я снова сохраняю будущий объект на другой карте, используя тот же идентификатор, что и уникальный ключ.

Ниже приведен фрагмент кода, который я использовал: В основном классе -

ThreadPoolExecutor  threadPool = null;
int poolSize = 20;
int maxPoolSize = 50;
long keepAliveTime = 10;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000);
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,keepAliveTime, TimeUnit.SECONDS, queue);

   ....
   ....
 for (each client...) {
   id = getId()..
   future = futuremap.get(id);
   if(!future.isDone())
      continue;
   if(future == null || future.isDone()) {
      processor = processormap.get(id);
      if(processor == null) {
         processor = new Processor(.....);
         //add to the map
         processormap.put(id,processor);
      }
      //submit the processor
      future = threadPool.submit(processor );
      futuremap.put(id,future);
 }
} 

Процессорная нить

public class MyProcessor implements Callable<String> {
        .....
        .....
    public String call() {
        ....
        ....
    }
 }

Проблема

Вышеуказанная реализация работает хорошо в моей тестовой среде. Однако в производственной среде (Edit # 1 - Ubuntu, Linux Slackware, Java - 1.6.0_18) мы заметили, что другие потоки приложения, которые не управляются с помощью этого нового ThreadpoolExecutor, оказываются затронутыми. их задачи откладываются на несколько часов. Это потому, что потоки, созданные ThreadPoolExecutors, берут все ресурсы или что-то еще и не дают возможности другим потокам.

Новые потоки, созданные с помощью ThreadPoolExceutor, выполняют самостоятельную задачу и не конфликтуют с другими потоками за ресурсы. то есть, нет сценария состояния гонки.

В журнале для новых потоков видно, что работает максимум 20 потоков (corepoolsize), и нет исключений отклонения, т. Е. Количество отправлений находится в пределах очереди.

Есть идеи, почему это происходит?

Заранее спасибо.

1 ответ

Мой предыдущий опыт работы с потоками в Linux ясно показал, что он гораздо более подвержен истощению потоков, когда очень загружен, чем та же система, работающая в Windows под той же нагрузкой. Я написал тестовую программу, которая показала, что при большой загрузке ЦП несколько потоков получат гораздо больше процессорного времени, чем другие, при использовании стандартных примитивов ожидания / уведомления - IIRC, на порядок. Мое решение состояло в том, чтобы использовать блокировку повторного входа в честном режиме, чтобы округлить их.

Опять IIRC, бросая в Thread.yield По завершении каждого рабочего мероприятия положительного эффекта не было.

На все это может существенно повлиять то, какая из нескольких библиотек потоков используется вашим дистрибутивом Linux.

Вы можете получить некоторое улучшение, добавив удушитель в рабочую очередь, которая доминирует в рабочей нагрузке, хотя было бы лучше, если бы он каким-то образом адаптировался к объему работы, ожидающей в других несвязанных потоках.

Другие вопросы по тегам