Spring CachingConnectionFactory ограничивает каналы и вызывает блокировку потоков
У меня есть простой издатель сообщений, который публикует rabbitmq (местный) с очень высокой скоростью.
ExecutorService threadPool = Executors.newFixedThreadPool(5, factory);
for (int i = 1; i <= 1000000; i++) {
int finalI = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
messagePublisher.publishMessage("testExchange", "testRoutingKey1", message + String.valueOf(finalI));
}
});
}
Здесь messagePublisher является экземпляром следующего класса
public class ChannelCachedMessagePublisher {
private static CachingConnectionFactory CACHING_CONNECTION_FACTORY = new CachingConnectionFactory("localhost");
private static RabbitTemplate RABBIT_TEMPLATE;
private void init() {
if (!INITIALIZED) {
synchronized (ConnectionCachedMessagePublisher.class) {
CACHING_CONNECTION_FACTORY.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
CACHING_CONNECTION_FACTORY.setChannelCacheSize(100);
CACHING_CONNECTION_FACTORY.setVirtualHost("testVHost");
CACHING_CONNECTION_FACTORY.setConnectionNameStrategy(CACHING_CONNECTION_FACTORY -> "arpit-cached-connection");
CACHING_CONNECTION_FACTORY.setPublisherConfirms(false);
RABBIT_TEMPLATE = new RabbitTemplate(CACHING_CONNECTION_FACTORY);
}
INITIALIZED = true;
}
}
public void publishMessage(String exchange, String routingKey, String message) {
RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = message.getMessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return new org.springframework.amqp.core.Message(message.getBody(),properties);
}
});
}
}
После выполнения этого кода 5 потоков создают только 5 каналов через одно соединение.
РЕДАКТИРОВАТЬ 11). Почему фабрика соединений не предоставляет здесь более 5 потоков и не связывает потоки в каждом потоке? Эта часть теперь понятна, поскольку фабрика соединений не может предоставить больше каналов (или соединений), чем количество запущенных потоков.
Затем сразу все потоки начинают блокироваться над объектом монитора в SocketFrameHandler.
"arpit-rmq-5" # 1019 prio = 5 os_prio = 31 tid = 0x00007fb90c5b8800 nid = 0x83303, ожидающий запись монитора [0x0000700047bca000]
java.lang.Thread.State: BLOCKED (на мониторе объекта) в com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:170)
Вот вид выхода из темы пост исполнения
2). Почему именно эти потоки блокируются и какие могут быть обходные пути для их решения?
ПРИМЕЧАНИЕ: Это приводит к переводу моего соединения с кроликом в состояние управления потоком, с которым я в порядке. Моя главная проблема - блокировка потоков в JVM.