Проблемы нагрузочного тестирования Java Netty
Я написал сервер, который принимает сообщения о соединении и бомбардирует ( ~100 байт), используя текстовый протокол, и моя реализация способна отправлять около 400К / сек сообщений с обратной связью с помощью стороннего клиента. Я выбрал Netty для этой задачи, SUSE 11 RealTime, JRockit RTS. Но когда я начал разрабатывать свой собственный клиент на основе Netty, я столкнулся с резким снижением пропускной способности (с 400K до 1,3K msg/sec). Код клиента довольно прост. Не могли бы вы дать совет или показать примеры, как написать гораздо более эффективный клиент. Я, на самом деле, больше беспокоюсь о задержке, но начал с тестов пропускной способности, и я не думаю, что нормально иметь 1,5Kmsg/sec для обратной связи. Цель клиента PS - получать сообщения только с сервера и очень редко отправлять сообщения.
Client.java
public class Client {
private static ClientBootstrap bootstrap;
private static Channel connector;
public static boolean start()
{
ChannelFactory factory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory( new ClientPipelineFactory() );
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("receiveBufferSize", 1048576);
ChannelFuture future = bootstrap
.connect(new InetSocketAddress("localhost", 9013));
if (!future.awaitUninterruptibly().isSuccess()) {
System.out.println("--- CLIENT - Failed to connect to server at " +
"localhost:9013.");
bootstrap.releaseExternalResources();
return false;
}
connector = future.getChannel();
return connector.isConnected();
}
public static void main( String[] args )
{
boolean started = start();
if ( started )
System.out.println( "Client connected to the server" );
}
}
ClientPipelineFactory.java
public class ClientPipelineFactory implements ChannelPipelineFactory{
private final ExecutionHandler executionHandler;
public ClientPipelineFactory( ExecutionHandler executionHandle )
{
this.executionHandler = executionHandle;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
1024, Delimiters.lineDelimiter()));
pipeline.addLast( "executor", executionHandler);
pipeline.addLast("handler", new MessageHandler() );
return pipeline;
}
}
MessageHandler.java
public class MessageHandler extends SimpleChannelHandler{
long max_msg = 10000;
long cur_msg = 0;
long startTime = System.nanoTime();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
cur_msg++;
if ( cur_msg == max_msg )
{
System.out.println( "Throughput (msg/sec) : " + max_msg* NANOS_IN_SEC/( System.nanoTime() - startTime ) );
cur_msg = 0;
startTime = System.nanoTime();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
Обновить. На стороне сервера есть периодический поток, который пишет в принятый клиентский канал. И канал скоро станет недоступным для записи. Обновление N2. В конвейер добавлен OrderedMemoryAwareExecutor, но пропускная способность по-прежнему очень низкая (около 4 Кбит / с)
Исправлена. Я поставил executor перед всем стеком конвейера, и это сработало!
1 ответ
Если сервер отправляет сообщения с фиксированным размером (~100 байт), вы можете установить ReceiveBufferSizePredictor для начальной загрузки клиента, это оптимизирует чтение
bootstrap.setOption("receiveBufferSizePredictorFactory",
new AdaptiveReceiveBufferSizePredictorFactory(MIN_PACKET_SIZE, INITIAL_PACKET_SIZE, MAX_PACKET_SIZE));
В соответствии с сегментом кода, который вы опубликовали: рабочий поток клиента nio делает все в конвейере, поэтому он будет занят декодированием и выполнением обработчиков сообщений. Вы должны добавить обработчик выполнения.
Вы сказали, что канал становится недоступным для записи со стороны сервера, поэтому вам может потребоваться настроить размеры водяных знаков в загрузчике сервера. Вы можете периодически отслеживать размер буфера записи (размер очереди записи) и следить за тем, чтобы канал становился недоступным для записи из-за невозможности записи в сеть. Это можно сделать, используя класс util, как показано ниже.
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
public final class NioChannelUtil {
public static long getWriteTaskQueueCount(Channel channel) {
NioSocketChannel nioChannel = (NioSocketChannel) channel;
return nioChannel.writeBufferSize.get();
}
}