Spring CachingConnectionFactory ограничивает каналы и вызывает блокировку потоков

У меня есть простой издатель сообщений, который публикует rabbitmq (местный) с очень высокой скоростью.

ExecutorService threadPool = Executors.newFixedThreadPool(5, factory);

for (int i = 1; i <= 1000000; i++) {
    int finalI = i;
    threadPool.submit(new Runnable() {
        @Override
        public void run() {
            messagePublisher.publishMessage("testExchange", "testRoutingKey1", message + String.valueOf(finalI));
        }
    });
}

Здесь messagePublisher является экземпляром следующего класса

public class ChannelCachedMessagePublisher {

    private static CachingConnectionFactory CACHING_CONNECTION_FACTORY = new CachingConnectionFactory("localhost");
    private static RabbitTemplate RABBIT_TEMPLATE;

    private void init() {
        if (!INITIALIZED) {
            synchronized (ConnectionCachedMessagePublisher.class) {
                CACHING_CONNECTION_FACTORY.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
                CACHING_CONNECTION_FACTORY.setChannelCacheSize(100);
                CACHING_CONNECTION_FACTORY.setVirtualHost("testVHost");
                CACHING_CONNECTION_FACTORY.setConnectionNameStrategy(CACHING_CONNECTION_FACTORY -> "arpit-cached-connection");
                CACHING_CONNECTION_FACTORY.setPublisherConfirms(false);
                RABBIT_TEMPLATE = new RabbitTemplate(CACHING_CONNECTION_FACTORY);
            }
            INITIALIZED = true;
        }
    }

    public void publishMessage(String exchange, String routingKey, String message) {

        RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = message.getMessageProperties();
                properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                return new org.springframework.amqp.core.Message(message.getBody(),properties);
            }
        });
    }
}

После выполнения этого кода 5 потоков создают только 5 каналов через одно соединение.

РЕДАКТИРОВАТЬ 11). Почему фабрика соединений не предоставляет здесь более 5 потоков и не связывает потоки в каждом потоке? Эта часть теперь понятна, поскольку фабрика соединений не может предоставить больше каналов (или соединений), чем количество запущенных потоков.

Затем сразу все потоки начинают блокироваться над объектом монитора в SocketFrameHandler.

"arpit-rmq-5" # 1019 prio = 5 os_prio = 31 tid = 0x00007fb90c5b8800 nid = 0x83303, ожидающий запись монитора [0x0000700047bca000]
java.lang.Thread.State: BLOCKED (на мониторе объекта) в com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:170)

Вот вид выхода из темы пост исполнения

2). Почему именно эти потоки блокируются и какие могут быть обходные пути для их решения?

ПРИМЕЧАНИЕ: Это приводит к переводу моего соединения с кроликом в состояние управления потоком, с которым я в порядке. Моя главная проблема - блокировка потоков в JVM.

0 ответов

Другие вопросы по тегам