Netty сервер перестает отвечать
Я имею ServerHandshakeHandler
который расширяется ChannelInboundHandlerAdapter
, Клиент эмулирует множественный доступ к серверу. После некоторого времени успешного общения сервер перестает отвечать, когда клиент пытается подключиться. Он не показывает никаких входящих подключений. Перезапуск клиента не помогает, только перезапуск сервера.
Я попытался установить соединение telnet, когда сервер перестает отвечать: соединение устанавливается, но я не могу получить ответ от сервера (когда сервер находится в нормальном состоянии, он отправляет ответ). Аналогичная ситуация с nmap -v --packet-trace -sT localhost -p {port}
: nmap обнаруживает, что порт открыт, но на сервере нет информации о входящем соединении.
Сервер:
public class ServerHandshakeHandler extends ChannelInboundHandlerAdapter {
private final ChannelGroup group;
private static final byte HANDSHAKE_SUCCEDED = 1;
private static final byte HANDSHAKE_FAILED = 0;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public ServerHandshakeHandler(ChannelGroup group) {
this.group = group;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.debug("in ServerHandshakeHandler.channelRead");
ByteBuf buf = (ByteBuf) msg;
String someField = getSomeField(buf);
ReferenceCountUtil.release(msg);
if (someField.isEmpty()) {
this.fireHandshakeFailed(ctx);
return;
}
LOG.debug("Removing handshake handler from pipeline.");
ctx.pipeline().remove(this);
this.fireHandshakeSucceeded(ctx);
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
LOG.debug("in ServerHandshakeHandler.channelActive, group size = " + this.group.size());
this.group.add(ctx.channel());
LOG.debug("Incoming connection from: {}",
ctx.channel().remoteAddress().toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("exception caught ", cause);
if (ctx.channel().isActive()) {
ctx.channel().close();
} else {
this.fireHandshakeFailed(ctx);
}
}
private void fireHandshakeFailed(ChannelHandlerContext ctx) {
LOG.debug("fire handshake failed");
ByteBuf buf = Unpooled.buffer(1);
buf.writeByte(HANDSHAKE_FAILED);
ctx.channel().writeAndFlush(buf);
ctx.channel().close();
ctx.fireUserEventTriggered(HandshakeEvent.handshakeFailed(ctx.channel()));
}
private void fireHandshakeSucceeded(ChannelHandlerContext ctx) {
LOG.debug("fire handshake succeded");
ByteBuf buf = Unpooled.buffer(1);
buf.writeByte(HANDSHAKE_SUCCEDED);
ctx.channel().writeAndFlush(buf);
ctx.fireUserEventTriggered(HandshakeEvent
.handshakeSucceeded(ctx.channel()));
}
}
Клиент:
public class MyClient {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String host;
private int port;
private Socket socket;
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public void send(String id, String message) {
try {
socket = new Socket(host, port);
LOG.debug("connected to server");
if (performHandshake(id)) {
LOG.debug("handshake success");
sendMessage(message);
}
socket.close();;
} catch (IOException ex) {
LOG.error("error while sending data", ex);
}
}
private boolean performHandshake(String id) {
try {
byte[] request = handshakeRequest(id);
writeBytes(request);
byte[] response = readBytes(1);
return (response != null && response.length == 1 && response[0] == 1);
} catch (IOException ex) {
LOG.error("perform handshake error", ex);
return false;
}
}
private byte[] handshakeRequest(String id) throws UnsupportedEncodingException {...}
private void writeBytes(byte[] data) throws IOException {
OutputStream out = socket.getOutputStream();
out.write(data);
}
private byte[] readBytes(int length) throws IOException {
InputStream in = socket.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte buffer[] = new byte[1024];
int currentLength = 0;
while (currentLength < length) {
int size = in.read(buffer); //here client stops waiting server response
if (size == -1) {
throw new IOException("unexpected end of stream");
}
baos.write(buffer, 0, size);
currentLength += size;
}
return baos.toByteArray();
}
}
1 ответ
РЕШИТЬ! Был узкий кусок кода, где я вызывал синхронизированную функцию с подключением к базе данных. Это соединение не может быть установлено по некоторым причинам, и функция зависает. Поток в этой функции переходит в состояние ОЖИДАНИЯ. Через некоторое время другие ступени пытаются получить доступ к этой функции и становятся заблокированными. Поэтому сервер перестает обрабатывать входящие соединения.
Я рекомендую инструмент профилирования jvisualvm, он помог мне найти эту ошибку.