Перенаправление каналов Lwt
У меня есть служба ssh, работающая на сокете Unix, и у меня есть локальный TCP-сервер, для которого я хочу, чтобы он был направлен на каналы сокета Unix.
В основном, когда я делаю:
$ ssh root@localhost -p 2000
Затем мой локальный TCP-сервер получает запрос и направляет его в сокет Unix, а TCP-клиент, в данном случае ssh, получает ответ из сокета Unix. Соответствующий код:
let running_tunnel debug (tcp_ic, tcp_oc) () =
Lwt_io.with_connection a_unix_addr begin fun (mux_ic, mux_oc) ->
let%lwt _ = some_call with_an_arg
and _ =
(* Some setup code *)
let rec forever () =
Lwt_io.read_line tcp_ic >>= fun opening_message ->
Lwt_io.write_from_string_exactly
mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
Lwt_io.read_line mux_ic >>= fun reply ->
Lwt_io.printl reply >>= fun () ->
Lwt_io.write_line tcp_oc reply >>= fun () ->
forever ()
in
forever ()
in
Lwt.return_unit
end
И этот вид работ. Он застревает, когда я вызываю ssh в командной строке, но я знаю, что получаю некоторые данные, потому что заголовок ssh другой стороны правильный, SSH-2.0-OpenSSH_6.7
, Я также получил свою сторону, чтобы распечатать больше частей начального рукопожатия ssh, то есть я вижу это напечатанным:
??^?W\zJ?~??curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group14-sha1ssh-rsa,ssh-dss>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1none,zlib@openssh.comnone,zlib@openssh.co
и т.д., что кажется правильным. Я понял, что причиной зависания было то, что я использую Lwt_io.read_line
поэтому я попробовал это вместо:
let rec forever () =
Lwt_io.read tcp_ic >>= fun opening_message ->
Lwt_io.write_from_string_exactly
mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
Lwt_io.read mux_ic >>= fun reply ->
Lwt_io.printl reply >>= fun () ->
Lwt_io.write tcp_oc reply >>= fun () ->
forever ()
in
forever ()
Который на самом деле работал хуже, он даже не распечатал первоначальное рукопожатие. Я также попробовал выделенный {write,read}_into
... функционирует с ограниченным успехом. Работая под strace / dtruce, я вижу конечные результаты, такие как:
read(0x6, "SSH-2.0-OpenSSH_6.9\r\n\0", 0x1000) = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.9\n\0", 0x14) = 20 0
read(0x7, "\0", 0x1000) = -1 Err#35
write(0x7, "SSH-2.0-OpenSSH_6.9\0", 0x13) = 19 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0) = 1 0
read(0x7, "SSH-2.0-OpenSSH_6.7\r\n\0", 0x1000) = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.7\n\0", 0x14) = 20 0
read(0x6, "\0", 0x1000) = -1 Err#35
write(0x6, "SSH-2.0-OpenSSH_6.7\n\0", 0x14) = 20 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0) = 1 0
read(0x6, "\0", 0x1000) = 1968 0
read(0x6, "\0", 0x1000) = -1 Err#35
^C
Где 6.9 - это ssh моей локальной машины, а 6.7 - удаленная машина за сокетом Unix. Мне кажется странным то, что \r
удаляется, и это меняет количество операций чтения / записи на 1. Я не уверен, что это может быть решающим различием.
В идеале я бы хотел какую-то абстракцию от Lwt, которая бы говорила всякий раз, когда есть данные, доступные на этом читаемом канале (сокет TCP), записывала их непосредственно в доступный для записи канал (сокет Unix) и наоборот.
1 ответ
Вариант с readline
не работал, так как поток данных является двоичным, и readline
для ввода текста на основе строки. Второй вариант с Lwt_io.read
функция не работает, так как эта функция будет читать весь ввод до конца, если вы не указали необязательный count
параметр. Это означает, что контроль будет передан write
только после EOF на стороне читателя. С помощью Lwt_io.read
с некоторым количеством, например, Lwt_io.read ~count:1024 mux_ic
было бы не очень плохой идеей. Кроме того, вы не должны забывать проверять возвращаемое значение, если ожидаете, что ваш поток будет конечным. read_into
следует использовать с осторожностью, так как, в отличие от read
функция, она не гарантирует, что она будет читать точный объем данных, которые вы запросили. Другими словами, будут короткие чтения. То же самое верно для write_into
функция. _exactly
версии этой функции не страдают от этой проблемы, поэтому лучше использовать их.
Есть еще одна вещь, которую вы должны рассмотреть. Lwt_io
предоставляет интерфейс для буферизованного ввода и вывода. Это означает, что все функции в этом модуле пишут и читают в некоторый внутренний буфер, а не взаимодействуют напрямую с операционной системой через дескриптор устройства. Это означает, что при передаче данных из одного буферизованного источника в другой буферизованный источник у вас будут неожиданные задержки на обоих концах. Таким образом, вы должны предвидеть их, используя приливы. В противном случае они могут ввести условия гонки, когда у вас есть двустороннее взаимодействие.
Более того, хотя буферизованный интерфейс значительно упрощает работу, он имеет свою цену. На самом деле, у вас есть несколько ненужных слоев буферов, когда вы используете Lwt_io
Вы также выделяете много ненужных данных, загружая свою память мусором. Проблема в том, что Lwt_io
имеет собственный внутренний буфер, который он не раскрывает для случайного пользователя, и все функции, которые возвращают данные или потребляют данные, должны выполнять дополнительную операцию копирования во внутреннюю функцию или из нее. Например, используя Lwt_io.{read,write}
, сделаем следующее:
- скопировать данные из ядра во внутренний буфер
- выделить строку
- скопировать данные из внутреннего буфера в выделенную строку
- (теперь
write
часть) скопировать данные из строки во внутренний буфер - скопировать данные из внутреннего буфера в ядро.
- (где-то, а иногда и позже, внутри GC) скопируйте выделенную строку из вспомогательной кучи в большую (если строка была достаточно маленькой, чтобы поместиться в вспомогательную кучу) или скопируйте ее из одного места в другое (если алгоритм сжатия решит переместить ее, и строка все еще жива, это вполне возможно, если производители опережают потребителя, и время жизни прочитанных данных становится довольно большим).
Похоже, что мы можем избавиться от копий в 2, 3, 4 и 6. Мы можем использовать наш собственный буфер и копировать в него данные из ядра, а затем копировать данные из этого ядра обратно в ядро. Мы даже можем избавиться от копий в 1 и 5, используя системные вызовы splice и tee, которые копируют данные непосредственно между буферами ядра без какого-либо вмешательства в пространство пользователя. Но в этом случае мы потеряем способность исследовать данные, и обычно это то, что мы хотим.
Итак, давайте попробуем удалить все копии, кроме копий из пространства ядра. Мы можем использовать низкоуровневый интерфейс для внутреннего буфера в Lwt_io
, лайк direct_access
и недавно добавленный block
функция, но это требует знания внутренних Lwt_io
и не очень тривиально, но все же выполнимо. Вместо этого мы будем использовать более простой подход, который использует Lwt_unix
библиотека. Эта библиотека напрямую взаимодействует с ядром без каких-либо промежуточных буферов, оставляя буферизацию самостоятельно.
open Lwt.Infix
let bufsiz = 32768
let echo ic oc =
let buf = Lwt_bytes.create bufsiz in
let rec loop p =
let p = p mod bufsiz in
Lwt_bytes.read ic buf p (bufsiz - p) >>= function
| 0 -> Lwt.return ()
| n -> Lwt_bytes.write oc buf p n >>= loop in
loop 0
Это позволит реализовать простое и быстрое дублирование данных, которое будет копировать данные с той же скоростью, что и cat
программа. Хотя все еще есть место для улучшения. Например, необходимо добавить обработку ошибок для надежности (в частности, для EINTR
сигнал). Также эта функция реализует синхронное копирование, где вход и выход плотно заблокированы. Иногда это не вариант. Рассмотрим следующий пример: вход является сокетом UDP, который может легко опередить потребителя, и данные будут отброшены, даже если в среднем производитель работает медленнее, чем потребитель. Чтобы справиться с этим, вам нужно разделить читателей и писателей на два отдельных потока, которые общаются через некоторую упругую очередь.
Lwt
довольно низкоуровневая библиотека, которая не решает и не должна решать эти проблемы за вас. Он предоставляет механизмы, которые можно использовать для создания решения для каждого конкретного случая. Есть библиотеки, которые предоставляют решения для некоторых общих шаблонов, 0MQ и наномассажи являются хорошими примерами.
Обновить
Я могу быть парнем слишком низкого уровня, и, может быть, я копаю до глубины души. Если вы действительно ищете подход высокого уровня, то вам следует использовать Lwt_stream
в этом случае вы можете закодировать эквивалент узла foo.pipe(bar).pipe(foo)
как
let echo ic oc = Lwt_io.(write_chars oc (read_chars ic))
Конечно, это будет намного медленнее, но это зависит от вашей задачи.
И да, чтобы выполнить двустороннее перенаправление, вам нужно просто запустить два потока, например так: echo ic oc <&> echo ic oc
для версии с дескрипторами файлов, которые доступны для записи. Если вы используете Lwt_io
каналы, которые являются однонаправленными, как каналы, тогда вы получите две конечные точки для каждой части. Давайте назовем их fi
а также fo
для ввода и вывода внешнего интерфейса соответственно, и bi
, bo
для серверной части. Затем вам нужно подключить его так: echo fo bi <&> echo bo fi
, используя вторую версию echo
с потоками.
Стоимость исполнения
Обычно абстракции высокого уровня сопряжены с затратами на производительность. В нашем конкретном случае, используя первую версию echo, имеет более чем 1Gb
в секунду. Версия с потоками имеет среднюю пропускную способность 5MB/s
, В зависимости от вашей настройки это может работать или не работать. Этого более чем достаточно для обычной сессии ssh, но это может повлиять на scp
в локальных сетях.