RxNetty повторно использовать соединение

Я хочу использовать Netflix-Ribbon в качестве балансировщика нагрузки TCP-клиента без Spring Cloud, и я пишу тестовый код.

public class App  implements Runnable
{
    public static String msg = "hello world";
    public BaseLoadBalancer lb;
    public RxClient<ByteBuf, ByteBuf >  client;
    public Server echo;

    App(){
        lb = new BaseLoadBalancer();
        echo = new Server("localhost", 8000);
        lb.setServersList(Lists.newArrayList(echo));
        DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
        client = RibbonTransport.newTcpClient(lb, impl);
    }
    public static void main( String[] args ) throws Exception
    {
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(new App());
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }
    public String sendAndRecvByRibbon(final String data) 
    {
        String response = "";
        try {
            response = client.connect().flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>,
                    Observable<ByteBuf>>() {
                public Observable<ByteBuf> call(ObservableConnection<ByteBuf, ByteBuf> connection) {

                    connection.writeStringAndFlush(data);
                    return connection.getInput();
                }
            }).timeout(1, TimeUnit.SECONDS).retry(1).take(1)
                    .map(new Func1<ByteBuf, String>() {
                        public String call(ByteBuf ByteBuf) {
                            return ByteBuf.toString(Charset.defaultCharset());
                        }
                    })
                    .toBlocking()
                    .first();
        }
        catch (Exception e) {
            System.out.println(((LoadBalancingRxClientWithPoolOptions) client).getMaxConcurrentRequests());
            System.out.println(lb.getLoadBalancerStats());
        }
        return response;
    }
    public void run() {
        for (int i = 0; i < 200; i++) {
            sendAndRecvByRibbon(msg);
        }
    }

}

я нахожу это будет создавать новый сокет каждый раз, когда я звонюsendAndRecvByRibbon хотя poolEnabled устанавливает в истину. Итак, это смущает меня, я что-то пропустил? и нет возможности настроить размер пула, но есть PoolMaxThreads а также MaxConnectionsPerHost,

У меня вопрос, как использовать пул соединений в моем простом коде, и что не так с моим sendAndRecvByRibbon, он открывает сокет, а затем использовать его только один раз, как я могу использовать соединение? Спасибо за ваше время.

сервер - просто простой эхо-сервер, пишущий на pyhton3, я комментируюconn.close() потому что я хочу использовать длинное соединение.

import socket
import threading
import time
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        while True:
            client_data = conn.recv(1024)
            if not client_data:
                time.sleep(5)
            conn.sendall(client_data)
#            conn.close()

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

if __name__ == "__main__":
    HOST, PORT = "localhost", 8000
    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    server.serve_forever()

и pom of mevan, я просто добавляю две зависимости в автоматически сгенерированном POM IED.

<dependency>
    <groupId>commons-configuration</groupId>
    <artifactId>commons-configuration</artifactId>
    <version>1.6</version>
</dependency>
<dependency>
    <groupId>com.netflix.ribbon</groupId>
    <artifactId>ribbon</artifactId>
    <version>2.2.2</version>
</dependency>

код для печати src_port

@Sharable
public class InHandle extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().localAddress());
        super.channelRead(ctx, msg);
    }
}

public class Pipeline implements PipelineConfigurator<ByteBuf, ByteBuf> {
    public InHandle handler;
    Pipeline() {
        handler = new InHandle();
    }
    public void configureNewPipeline(ChannelPipeline pipeline) {
        pipeline.addFirst(handler);
    }
}

и изменить client = RibbonTransport.newTcpClient(lb, impl);в Pipeline pipe = new Pipeline();client = RibbonTransport.newTcpClient(lb, pipe, impl, new DefaultLoadBalancerRetryHandler(impl));

1 ответ

Так что ваши App() Конструктор выполняет инициализацию lb/client/etc.

Затем вы запускаете 40 различных потоков с 40 различными экземплярами RxClient (каждый экземпляр имеет собственный пул по умолчанию), вызывая new App() в первом за цикл. Чтобы прояснить ситуацию - то, как вы создаете здесь несколько экземпляров RxClient, не позволяет им совместно использовать какой-либо общий пул. Попробуйте использовать один экземпляр RxClient.

Что если вы измените свой основной метод, как показано ниже, он прекратит создавать дополнительные сокеты?

    public static void main( String[] args ) throws Exception
    {
        App app = new App() // Create things just once
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(()->app.run()); // pass the run()
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }

Если вышеуказанное не поможет полностью (по крайней мере, это уменьшит количество созданных сокетов в 40 раз) - не могли бы вы уточнить, как именно вы определите, что:

я нахожу, что он будет создавать новый сокет каждый раз, когда я вызываю sendAndRecvByRibbon

и каковы ваши измерения после обновления конструктора этой строкой:

    DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
    impl.set(CommonClientConfigKey.PoolMaxThreads,1); //Add this one and test

Обновить

Да, глядя на sendAndRecvByRibbon кажется, что ему не хватает маркировки PooledConnection как больше не полученной при вызове close как только вы не ожидаете дальнейших чтений от него.

Пока вы ожидаете единственное событие чтения, просто измените эту строку

connection.getInput()

к

return connection.getInput().zipWith(Observable.just(connection), new Func2<ByteBuf, ObservableConnection<ByteBuf, ByteBuf>, ByteBuf>() {
                        @Override
                        public ByteBuf call(ByteBuf byteBuf, ObservableConnection<ByteBuf, ByteBuf> conn) {
                            conn.close();
                            return byteBuf; 
                        }
                    });

Обратите внимание: если вы разрабатываете более сложный протокол по протоколу TCP, входной bytebuf может быть проанализирован на предмет вашего конкретного знака "конец связи", который указывает, что соединение может быть возвращено в пул.

Другие вопросы по тегам