Асинхронный сервер с использованием Java NIO

Я использую этот учебник для создания сервера Java Java без раздела для записи.

Все отлично работает, кроме одной интересной вещи:

  • Когда клиент отправляет пакеты слишком быстро, сервер не получает все сообщения, сервер всегда получает первый и второй пакеты, но не более того.
  • Если клиент отправляет пакеты медленно, сервер получает все пакеты.

Любая идея?

Я добавляю код класса сервера, если вам нужен другой класс, упомянутый в приведенном ниже коде, я здесь:).

Класс НИОСервер:

package server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

import javax.xml.parsers.ParserConfigurationException;

import org.xml.sax.SAXException;

public class NioServer implements Runnable {



// The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  //the cach will hundle the messages that came
  private Cache cache;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException {
    this.cache = cache;
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }


  private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
      }

  private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        socketChannel.register(this.selector, SelectionKey.OP_READ);
      }

  private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
          numRead = socketChannel.read(this.readBuffer);
          String test = new String(this.readBuffer.array());
          System.out.print(test);

        } catch (IOException e) {
          // The remote forcibly closed the connection, cancel
          // the selection key and close the channel.
        //  key.cancel();
        //  socketChannel.close();
          return;
        }

        if (numRead == -1) {
          // Remote entity shut the socket down cleanly. Do the
          // same from our end and cancel the channel.
          key.channel().close();
          key.cancel();
          return;
        }

        // Hand the data off to our worker thread
        this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
      }

  public void run() {
        while (true) {
          try {
            // Wait for an event one of the registered channels

            this.selector.select();



            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
              SelectionKey key = (SelectionKey) selectedKeys.next();
              selectedKeys.remove();

              if (!key.isValid()) {
                continue;
              }

              // Check what event is available and deal with it
              if (key.isAcceptable()) {
                this.accept(key);
              } else if (key.isReadable()) {
                this.read(key);
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

  public static void main(String[] args) throws ParserConfigurationException, SAXException {
    try {
        Cache cache = new Cache();
        new Thread(cache).start();
      new Thread(new NioServer(null, 9090,cache)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

1 ответ

Я ожидаю, что если вы читаете UDP. Обратите внимание, как медленно вы обрабатываете ваши пакеты на read метод. Вы печатаете их в system.out, что очень медленно, и вы не знаете, как быстро вы сможете обрабатывать данные в другом потоке processData метод. Эта библиотека, которую я написал, может помочь вам установить неблокирующее взаимодействие между потоками, если это является источником вашей задержки. Вы также должны проверить размер вашего основного буфера сокета чтения. Чем оно больше, тем больше места нужно быть быстрым и наверстать упущенное, прежде чем пакеты начнут отбрасываться. Для TCP вы, вероятно, получите IOException на канале, если базовый буфер сокета заполнится. Для UDP пакеты молча отбрасываются.

Чтобы получить доступ к базовому размеру буфера сокета чтения, вы можете сделать:

final Socket socket = channel.socket();
System.out.println(socket.getReceiveBufferSize());
socket.setReceiveBufferSize(newSize);

Примечание: AFAIK, Linux может потребоваться некоторая конфигурация ОС, чтобы вы могли изменить размер основного буфера. Если setReceiveBufferSize не имеет никакого эффекта (прочитайте это снова, чтобы видеть, было ли это изменено), Google об этом.:)

Другие вопросы по тегам