Как вы читаете и распечатываете ответ HTTP с использованием java.net.http по мере поступления фрагментов?

Java 11 представляет новый пакет, java.net.http, для выполнения HTTP-запросов. Для общего использования это довольно просто. Мой вопрос заключается в том, как использовать java.net.http для обработки фрагментированных ответов при получении каждого фрагмента клиентом? java.http.net содержит реактивный BodySubscriber, который, кажется, то, что я хочу, но я не могу найти пример того, как он используется.

http_get_demo.py

Ниже приведена реализация Python, которая печатает куски по мере их поступления, я хотел бы сделать то же самое с java.net.http:

import argparse
import requests


def main(url: str):
    with requests.get(url, stream=True) as r:
        for c in r.iter_content(chunk_size=1):
            print(c.decode("UTF-8"), end="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Read from a URL and print as text as chunks arrive")
    parser.add_argument('url', type=str, help="A URL to read from")
    args = parser.parse_args()

    main(args.url)

HttpGetDemo.java

Просто для полноты вот простой пример создания запроса на блокировку с использованием java.net.http:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

public class HttpGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();
    var response = client.send(request, bodyHandler);
    System.out.println(response.body());

  }
}

HttpAsyncGetDemo.java

А вот пример выполнения неблокирующего / асинхронного запроса:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

/**
 * ReadChunked
 */
public class HttpAsyncGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();

    client.sendAsync(request, bodyHandler)
            .thenApply(HttpResponse::body)
            .thenAccept(System.out::println)
            .join();

  }
}

5 ответов

Решение

Спасибо @pavel и @chegar999 за частичные ответы. Они привели меня к моему решению.

обзор

Решение, которое я придумал, ниже. По сути, решение заключается в использовании пользовательских java.net.http.HttpResponse.BodySubscriber, BodySubscriber содержит реактивные методы (onSubscribe, onNext, onError и onComplete) и метод getBody, который в основном возвращает java CompletableFuture, который в конечном итоге создаст тело HTTP-запроса. Когда у вас есть BodySubscriber, вы можете использовать его следующим образом:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, responseInfo -> new StringSubscriber())
    .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
    .thenApply(HttpResponse::body);

Обратите внимание на строку:

client.sendAsync(request, responseInfo -> new StringSubscriber())

Здесь мы регистрируем нашего собственного BodySubscriber; в этом случае мой пользовательский класс называется StringSubscriber,

CustomSubscriber.java

Это полный рабочий пример. Используя Java 11, вы можете запустить его без компиляции. Просто вставьте его в файл с именем CustomSubscriber.java, затем выполните команду java CustomSubscriber <some url>, Он печатает содержимое каждого куска по мере его поступления. Он также собирает их и возвращает их как тело, когда ответ завершен.

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;

public class CustomSubscriber {

  public static void main(String[] args) {
    CustomSubscriber cs = new CustomSubscriber();
    String body = cs.get(args[0]).join();
    System.out.println("--- Response body:\n: ..." + body + "...");
  }

  public CompletableFuture<String> get(String uri) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(uri))
        .build();

    return client.sendAsync(request, responseInfo -> new StringSubscriber())
        .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
        .thenApply(HttpResponse::body);
  }

  static class StringSubscriber implements BodySubscriber<String> {

    final CompletableFuture<String> bodyCF = new CompletableFuture<>();
    Flow.Subscription subscription;
    List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();

    @Override
    public CompletionStage<String> getBody() {
      return bodyCF;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      subscription.request(1); // Request first item
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
      System.out.println("-- onNext " + buffers);
      try {
        System.out.println("\tBuffer Content:\n" + asString(buffers));
      } 
      catch (Exception e) {
        System.out.println("\tUnable to print buffer content");
      }
      buffers.forEach(ByteBuffer::rewind); // Rewind after reading
      responseData.addAll(buffers);
      subscription.request(1); // Request next item
    }

    @Override
    public void onError(Throwable throwable) {
      bodyCF.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
      bodyCF.complete(asString(responseData));
    }

    private String asString(List<ByteBuffer> buffers) {
      return new String(toBytes(buffers), StandardCharsets.UTF_8);
    }

    private byte[] toBytes(List<ByteBuffer> buffers) {
      int size = buffers.stream()
          .mapToInt(ByteBuffer::remaining)
          .sum();
      byte[] bs = new byte[size];
      int offset = 0;
      for (ByteBuffer buffer : buffers) {
        int remaining = buffer.remaining();
        buffer.get(bs, offset, remaining);
        offset += remaining;
      }
      return bs;
    }

  }
}

Пробовать

Чтобы протестировать это решение, вам понадобится сервер, который отправляет ответ, который использует Transfer-encoding: chunked и отправляет его достаточно медленно, чтобы посмотреть, как появляются куски. Я создал его по адресу https://github.com/hohonuuli/demo-chunk-server но вы можете раскрутить его с помощью Docker следующим образом:

docker run -p 8080:8080 hohonuuli/demo-chunk-server

Затем запустите код CustomSubscriber.java, используя java CustomSubscriber.java http://localhost:8080/chunk/10

Код Python не гарантирует, что данные тела ответа будут доступны по одному фрагменту HTTP за раз. Он просто предоставляет небольшое количество данных приложению, тем самым уменьшая объем памяти, потребляемой на уровне приложения (она может быть буферизована ниже в стеке). HTTP-клиент Java 11 поддерживает потоковую передачу через один из обработчиков потокового тела, HttpResponse.BodyHandlers: ofInputStream, ofByteArrayConsumer, asLines, так далее.

Или напишите свой собственный обработчик / подписчик, как показано: https://www.youtube.com/watch?v=qiaC0QMLz5Y

Вы можете распечатать ByteBufferкак они приходят, но нет никакой гарантии, что ByteBuffer соответствует куску. Куски обрабатываются стеком. Один ByteBuffer ломтик будет выдвигаться для каждого чанка, но если в буфере недостаточно места, будет добавлен частичный чанк. Все, что видит потребитель, это поток ByteBufferы, которые содержат данные. Итак, что вы можете сделать, это распечатать эти ByteBufferПо мере их поступления, но вы не можете гарантировать, что они соответствуют ровно одному чанку, который был отправлен сервером.

Примечание. Если тело вашего запроса основано на тексте, вы можете использоватьBodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber) с обычаем Subscriber<String> это будет печатать каждую строку, как она приходит. BodyHandlers.fromLineSubscriber выполняет жесткое слово декодирования байтов в символы с использованием набора символов, указанного в заголовках ответа, буферизует байты, если это необходимо, до тех пор, пока они не будут декодированы (ByteBuffer может заканчиваться в середине последовательности кодирования, если текст содержит символы, закодированные в нескольких байтах), и расщепление их на границе линии. Метод Subscriber::onNext будет вызываться один раз для каждой строки в тексте. См. https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html для получения дополнительной информации.

Это тривиальное решение, основанное на ответе chegar999:

      httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofByteArrayConsumer((final Optional<byte[]> receivedBytesOptional) -> {
    if (receivedBytesOptional.isEmpty()) {
        System.out.println("Done");
    } else {
        final byte[] receivedBytes = receivedBytesOptional.get();
        final String received = new String(receivedBytes, StandardCharsets.UTF_8);
        System.out.println("Received: " + received);
    }
}));

Это работает до тех пор, пока ваши многобайтовые символы не разделены на два фрагмента (в противном случае используйте).

По моему скромному мнению, это проще и менее подвержено ошибкам, чем использование пользовательского подписчика, который больше подходит в случаях, требующих большей гибкости.

Теперь существует новая библиотека Java для удовлетворения таких требований RxSON: https://github.com/rxson/rxson. Он использует JsonPath с RxJava для чтения потоковых фрагментов JSON из ответа, как только они поступают, и их синтаксического анализа. объекты java.

Пример:

String serviceURL = "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json";
   HttpRequest req = HttpRequest.newBuilder(URI.create(serviceURL)).GET().build();
   RxSON rxson = new RxSON.Builder().build();

   String jsonPath = "$[*].Airport.Name";
   Flowable<String> airportStream = rxson.create(String.class, req, jsonPath);
   airportStream
       .doOnNext(it -> System.out.println("Received new item: " + it))
       //Just for test
       .toList()
       .blockingGet();
Другие вопросы по тегам