Как настроить RabbitMQ RPC в веб-контексте
RabbitMQ RPC
Я решил использовать RabbitMQ RPC, как описано здесь.
Моя настройка
Входящие веб-запросы (на Tomcat) будут отправлять RPC-запросы через RabbitMQ различным службам и собирать результаты. Я использую одну очередь ответов с одним пользовательским потребителем, который прослушивает все ответы RPC и собирает их с их идентификатором корреляции в простой хэш-карте. Ничего особенного там нет. Это прекрасно работает в простом интеграционном тесте на уровне контроллера.
проблема
Когда я пытаюсь сделать это в веб-проекте, развернутом на Tomcat, Tomcat отказывается закрываться. jstack и некоторая отладка узнали, что поток порожден для прослушивания ответа RPC и не позволяет Tomcat корректно завершить работу. Я предполагаю, что это потому, что созданный поток создается на уровне приложения, а не на уровне запроса и не управляется Tomcat. Когда я устанавливаю точки останова в Servlet.destroy()
или же ServletContextListener.contextDestroyed(ServletContextEvent sce)
, они не достигнуты, поэтому я не вижу способа вручную почистить вещи.
альтернатива
В качестве альтернативы я мог бы использовать новую очередь ответов (и простой QueueingConsumer) для каждого веб-запроса. Я проверил это, он работает, и Tomcat выключается, как и должно. Но мне интересно, так ли это?.. Может ли кластер RabbitMQ справиться с тысячами (или даже миллионами) коротких очередей / потребителей? Я могу себе представить, что очереди не такие большие, но все же... постоянно транслируются на все узлы кластера... общий объем памяти..
Вопрос
Короче говоря, разумно ли создавать очередь для каждого входящего веб-запроса или как настроить RabbitMQ с одной очередью и потребителем, чтобы Tomcat мог корректно завершить работу?
1 ответ
Я нашел решение для моей проблемы:
Java-клиент создает свои собственные потоки. Есть возможность добавить свой ExecutorService
при создании нового соединения. Делая это в ServletContextListener.initialized()
метод, можно отслеживать ExecutorService
и выключите его вручную в ServletContextListener.destroyed()
метод.
executorService.shutdown();
executorService.awaitTermination(20, TimeUnit.SECONDS);
я использовал Executors.newCachedThreadPool();
поскольку потоки имеют много коротких исполнений, и они очищаются при простое более 60 с.
Это ссылка на ветку группы RabbitMQ Google (спасибо Михаилу Клишину за указание верного направления)