Netty chunked входной поток

Я видел много вопросов о кусочных потоках в netty, но большинство из них были решениями об исходящих потоках, а не о входящих потоках.

Я хотел бы понять, как я могу получить данные из канала и отправить их как InputStream в свою бизнес-логику без предварительной загрузки всех данных в памяти. Вот что я пытался сделать:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private HttpServletRequest request;
  private PipedOutputStream os;
  private PipedInputStream is;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    this.os = new PipedOutputStream();
    this.is = new PipedInputStream(os);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
    this.os.close();
    this.is.close();
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf body = ((HttpContent) msg).content();

      if (body.readableBytes() > 0)
        body.readBytes(os, body.readableBytes());

      if (msg instanceof LastHttpContent) {
        os.close();
      }
    }

  }

}

А затем у меня есть другой обработчик, который получит мой CustomHttpRequest и отправит его в то, что я называю ServiceHandler, где моя бизнес-логика будет читать из InputStream.

public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
    @Override
    public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
        future = serviceHandler.handle(request, response);
...

Это не работает, потому что когда мой обработчик перенаправляет запрос CustomHttpRequest в ServiceHandler, и он пытается прочитать из InputStream, поток блокируется, и HttpContent никогда не обрабатывается в моем декодере.

Я знаю, что могу попытаться создать отдельную ветку для своей бизнес-логики, но у меня сложилось впечатление, что я здесь все усложняю.
Я посмотрел на ByteBufInputStream, но он говорит, что

Обратите внимание, что он считывает только количество читаемых байтов, определенных на момент создания.

Поэтому я не думаю, что это будет работать для запросов Chunked Http. Кроме того, я видел ChunkedWriteHandler, который подходит для кусков Oubound, но я не смог найти что-то вроде ChunkedReadHandler...

Итак, мой вопрос: каков наилучший способ сделать это? Мои требования:

- не хранить данные в памяти перед отправкой ServiceHandlers;
- API ServiceHandlers должен быть независимым от netty (поэтому я использую мой CustomHttpRequest вместо NetttReReest);

ОБНОВЛЕНИЕ Я заставил это работать, используя более реактивный подход на CustomHttpRequest. Теперь запрос не предоставляет InputStream для ServiceHandlers, чтобы они могли читать (который блокировал), но вместо этого CustomHttpRequest теперь имеет readInto(OutputStream) метод, который возвращает Future, и весь обработчик службы будет просто выполнен, когда этот Outputstream будет заполнен. Вот как это выглядит

public class CustomHttpRequest {
  ...constructors and other methods hidden...
  private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();

  private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();

  private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);

  public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
    outputStreamFuture.set(os);
    return this.writeCompleteFuture;
  }

  ListenableFuture<Void> writeChunk(byte[] buf) {
    this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
      outputStreamFuture.get().write(buf);
      return Futures.immediateFuture(null);
    });
    return lastWriteFuture;
  }


  void complete() {
    ListenableFuture<Void> future =
        Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
          outputStreamFuture.get().close();
          return Futures.immediateFuture(null);
        });
    addFinallyCallback(future, () -> {
      this.writeCompleteFuture.set(null);
    });

  }
}

И мой обновленный ServletRequestHandler выглядит так:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private NettyHttpServletRequestAdaptor request;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
  }


  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;

      this.request = new CustomHttpRequest(request, ctx.channel());

      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf buf = ((HttpContent) msg).content();
      byte[] bytes = new byte[buf.readableBytes()];
      buf.readBytes(bytes);

      this.request.writeChunk(bytes);

      if (msg instanceof LastHttpContent) {
        this.request.complete();
      }
    }
  }
}

Это работает довольно хорошо, но, тем не менее, обратите внимание, что все здесь делается в одном потоке, и, возможно, для больших данных я мог бы захотеть создать новый поток, чтобы освободить этот поток для других каналов.

1 ответ

Вы на правильном пути - если ваш serviceHandler.handle(request, response); call выполняет блокирующее чтение, вам нужно создать новый поток для него. Помните, что должно быть только небольшое количество рабочих потоков Netty, поэтому вы не должны делать никаких блокирующих вызовов в рабочих потоках.

Другой вопрос, который нужно задать, должен ли ваш сервисный обработчик блокироваться? Что оно делает? В любом случае, если данные будут передаваться по сети, можете ли вы включить их в конвейер Netty неблокирующим образом? Таким образом, все полностью асинхронно, никаких блокирующих вызовов и дополнительных потоков не требуется.

Другие вопросы по тегам