Async Http Client + Netty
Я сталкиваюсь с проблемами при тестировании асинхронного http-клиента (версия - 2.4.3) и получаю следующее исключение при выполнении кода, вставленного ниже. Также, к вашему сведению, я новичок в Netty и async-http-client, так что извините меня, если у меня возникнет пробел в моем понимании.
exception - io.netty.channel.ConnectTimeoutException: connection timed out:
Ниже приведены мои предположения -
- В приведенном ниже коде будет один поток (единственный eventloop).
- keepalive обеспечит повторное использование TCP-соединений
- Размер соединений tcp определяется setMaxConnections & setMaxConnectionsPerHost
- ThrottleRequestFilter гарантирует, что запрос не будет выполнен, когда все соединения активно используются потоками, обрабатывающими код обработчика.
Вот мой вопрос -
1] Хотя я использую ThrottleRequestFilter, почему я должен видеть тайм-аут соединения. Я предполагаю, что после обработки ответа обработчиком, соединение должно быть доступно для повторного использования, так как у меня есть набор keepalive, если я что-то упустил.
2] Каков размер пула потоков по умолчанию, который выполняет код AsyncHandler.
Вот мой тестовый прогон
Total Requests - N = 1000
Max Con = 1000
Max Con/Host = 1000
pcit = 60000
ka = true
и следующие метрики -
======================================
Total num of Reqs: 1000
Concurrency : 1000
Total Time in secs: 1.0
HTTP 200 OK: 789
HTTP 200 NOT OK: 211
Total Completed : 1000
rps : 789.0
======================================
public class HttpBm {
final Metrics metrics;
long startTime = System.nanoTime();
public HttpBm(Metrics metrics) {
this.metrics = metrics;
}
private void run(int n, int mc, int mcph, int pcit, boolean ka, String url) {
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
.addRequestFilter(new ThrottleRequestFilter(mcph))
.setMaxConnections(mc)
.setMaxConnectionsPerHost(mcph)
.setKeepAlive(ka)
.setConnectTimeout(1000)
.setConnectionTtl(500));
for (int r=0;r<n;r++) {
final ListenableFuture<Response> whenResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
private Integer status;
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
if (200 == responseStatus.getStatusCode())
metrics.incrHttp200OK();
else
metrics.incrHttpNon200OK();
return State.ABORT;
}
public State onHeadersReceived(HttpHeaders headers) throws Exception {
return State.ABORT;
}
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
return State.ABORT;
}
public void onThrowable(Throwable t) {
metrics.incrHttpNon200OK();
t.printStackTrace();
}
public Response onCompleted() throws Exception {
return null;
}
});
}
long endTime = System.nanoTime();
boolean done = true;
while (done) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
done = metrics.print(startTime, endTime);
}
}
public static void main(String[] args) {
int n = 1000;
int mc = 1000;
int mcph = 1000;
int pcit = 60000;
String url = "http://somehost:8080/index.html";
if (args != null) {
n = Integer.parseInt(args[0]);
mc = Integer.parseInt(args[1]);
mcph = Integer.parseInt(args[2]);
pcit = Integer.parseInt(args[3]);
url = args[4];
}
System.out.println();
System.out.println("==================================");
System.out.println("Url = " + url);
System.out.println("Total Requests - N = " + n);
System.out.println("Max Con = " + mc);
System.out.println("Max Con/Host = " + mcph);
System.out.println("pcit = " + pcit);
System.out.println("ka = true");
System.out.println("==================================");
HttpBm httpBm = new HttpBm(new Metrics(n, mc));
httpBm.run(n , mc, mcph, pcit, true, url);
}
}
import java.util.concurrent.atomic.AtomicInteger;
public class Metrics {
private int n;
private int c;
private AtomicInteger Http200OK = new AtomicInteger();
private AtomicInteger HttpNon200OK = new AtomicInteger();
public Metrics(int n, int c) {
this.n = n;
this.c = c;
}
public void incrHttp200OK() {
Http200OK.incrementAndGet();
}
public void incrHttpNon200OK() {
HttpNon200OK.incrementAndGet();
}
public boolean isComplete() {
return Http200OK.get() + HttpNon200OK.get() >= n;
}
public boolean print(long startTime, long finishTime) {
final float totalTimeSec = (finishTime - startTime) / 1000000000;
final float rps = this.Http200OK.get() / totalTimeSec;
System.out.println("======================================");
System.out.println("Total num of Reqs:\t" + n);
System.out.println("Concurrency :\t" + c);
System.out.println("Total Time in secs:\t" + totalTimeSec);
System.out.println("HTTP 200 OK:\t" + Http200OK.get());
System.out.println("HTTP 200 NOT OK:\t" + HttpNon200OK.get());
System.out.println("Total Completed :\t" + (Http200OK.get() + HttpNon200OK.get()));
System.out.println("rps : " + rps);
System.out.println("======================================");
return isComplete();
}
}