Netty: обратное распространение давления в трубопроводе обработчика канала для замедления отправителя

Я использую netty для разработки приложения, которое будет прослушивать определенный порт через TCP. После получения байтов у меня есть конвейер с бизнес-логикой для работы с полученными байтами. Этот конвейер состоит из нескольких обработчиков каналов, таких как декодер заголовка, обработчик фрагментации на уровне приложения и т. Д. В конце конвейера после обработки сообщения последний обработчик в конвейере (скажем, BufferWriter) помещает обработанное сообщение в очередь блокировки. Эта очередь блокировки действует как буфер, а BufferWriter действует как производитель. Обратите внимание, что эта очередь блокировки является общей для всех каналов. Таким образом, все данные, полученные приложением, будут записаны в одну очередь. Существует еще одна запланированная служба исполнителя, которая будет работать как потребитель этого буфера. Этот потребитель является периодической задачей, которая запускается через определенный промежуток времени и получает сообщение из буфера и записывает в файл.

Проблема, с которой я столкнулся, заключается в том, что при большом количестве входящего трафика мой потребительский поток не может идти в ногу со временем. В результате буферная очередь заполняется.

В netty, есть ли способ замедлить скорость чтения из сокета, чтобы потребитель не отставал от производителя? Примерно так: когда буфер заполнится, netty не будет читать из сокета, и когда в буфере будет свободное место, он возобновит чтение из сокета.

Обратите внимание, что в этом случае отправитель не пишется в java / netty. Это c программа, которая сформирует TCP соединение с моим сервером и начнет отправку данных. Я предполагаю, что, поскольку я замедляю скорость чтения моего сокета сервера, TCP будет автоматически замедлять отправителя, используя политику контроля перегрузки (Медленный запуск)

1 ответ

Для контроля чтения есть конфиг autoRead в Channel, Вы можете установить это в ложь:

ctx.channel().config().setAutoRead(false);

Если вы сделаете это, вам нужно вручную запустить чтение с канала:

ctx.channel().read();

Вы можете замедлить скорость чтения, предварительно установив autoRead в false, затем наличие буфера / счетчика в конце чтения (входящего) и чтение только тогда, когда буфер пуст до его заполнения.

В транспорте WSO2 это делается как здесь с использованием слушателя.

Другие вопросы по тегам