Создавайте тысячи клиентов Netty, не создавая при этом тысячи потоков.
Я создал довольно простой сервер с использованием Netty 4. Я смог масштабировать его для обработки нескольких тысяч соединений, и он никогда не поднимается выше ~40 потоков.
Чтобы проверить это, я также создал тестовый клиент, который создает тысячи соединений. К сожалению, это создает столько потоков, сколько и соединений. Я надеялся минимизировать потоки для клиентов. Я посмотрел на много сообщений для этого. Многие примеры показывают настройку одного соединения. Это и это говорит об обмене NioEventLoopGroup между клиентами, что я и делаю. Я получаю ограниченное количество nioEventLoopGroup, но получаю поток для каждого соединения в другом месте. Я специально не создаю потоки в конвейере и не вижу, что может быть.
Вот фрагмент из настройки моего клиентского кода. Кажется, что он должен поддерживать фиксированное количество потоков на основе того, что я исследовал до сих пор. Есть ли что-то, чего мне не хватает, что я должен делать, чтобы предотвратить поток для каждого клиентского соединения?
Главный
final EventLoopGroup group = new NioEventLoopGroup();
for (int i=0; i<100; i++)){
MockClient client = new MockClient(i, group);
client.connect();
}
MockClient
public class MockClient implements Runnable {
private final EventLoopGroup group;
private int identity;
public MockClient(int identity, final EventLoopGroup group) {
this.identity = identity;
this.group = group;
}
@Override
public void run() {
try {
connect();
} catch (Exception e) {}
}
public void connect() throws Exception{
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new MockClientInitializer(identity, this));
final Runnable that = this;
// Start the connection attempt
b.connect(config.getHost(), config.getPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel ch = future.sync().channel();
} else {
//if the server is down, try again in a few seconds
future.channel().eventLoop().schedule(that, 15, TimeUnit.SECONDS);
}
}
});
}
}
1 ответ
Как уже много раз случалось со мной, подробное объяснение проблемы заставило меня задуматься об этом, и я столкнулся с этой проблемой. Я хотел предоставить это здесь, если кто-то еще столкнется с той же проблемой при создании тысяч клиентов Netty.
У меня есть один путь в моем конвейере, который создаст задачу тайм-аута для имитации перезагрузки клиентского соединения. Оказывается, именно эта задача таймера создавала дополнительные потоки для каждого соединения всякий раз, когда он получал сигнал "перезагрузки" от сервера (что происходит очень часто) до тех пор, пока не было потока для каждого соединения.
укротитель
private final HashedWheelTimer timer;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
Packet packet = reboot();
ChannelFutureListener closeHandler = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
RebootTimeoutTask timeoutTask = new RebootTimeoutTask(identity, client);
timer.newTimeout(timeoutTask, SECONDS_FOR_REBOOT, TimeUnit.SECONDS);
}
};
ctx.writeAndFlush(packet).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
future.channel().close().addListener(closeHandler);
} else {
future.channel().close();
}
}
});
}
Задача тайм-аута
public class RebootTimeoutTask implements TimerTask {
public RebootTimeoutTask(...) {...}
@Override
public void run(Timeout timeout) throws Exception {
client.connect(identity);
}
}