Paramiko зависает при выполнении большой команды wget
Привет у меня проблемы с выполнением команды, которая выполняет Wget файла 100 МБ на сервере Ubuntu 10. Более короткие команды работают хорошо, за исключением этого. Приведенный ниже класс содержит информацию о том, как я использую paramiko и мои различные попытки преодоления этой проблемы (см. Различные методы run или exec). В случае exec_cmd выполнение зависает на этой строке:
out = self.in_buffer.read(nbytes, self.timeout)
из метода recv модуля channel.py от paramiko.
Эта же команда wget отлично работает в оболочке, используя обычную утилиту ssh от Mac.
"""
Management of SSH connections
"""
import logging
import os
import paramiko
import socket
import time
import StringIO
class SSHClient():
def __init__(self):
self._ssh_client = paramiko.SSHClient()
self._ssh_client.load_system_host_keys()
self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.time_out = 300
self.wait = 5
def connect(self, hostname, user, pkey):
retry = self.time_out
self.hostname = hostname
logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey))
while retry > 0:
try:
self._ssh_client.connect(hostname,
username=user,
key_filename=os.path.expanduser(pkey),
timeout=self.time_out)
return
except socket.error, (value,message):
if value == 61 or value == 111:
logging.warning('SSH Connection refused, will retry in 5 seconds')
time.sleep(self.wait)
retry -= self.wait
else:
raise
except paramiko.BadHostKeyException:
logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname)
logging.warning('Edit that file to remove the entry and then try again')
retry = 0
except EOFError:
logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds')
time.sleep(self.wait)
retry -= self.wait
logging.error('Could not establish SSH connection')
def exists(self, path):
status = self.run('[ -a %s ] || echo "FALSE"' % path)
if status[1].startswith('FALSE'):
return 0
return 1
def shell(self):
"""
Start an interactive shell session on the remote host.
"""
channel = self._ssh_client.invoke_shell()
interactive_shell(channel)
def run(self, command):
"""
Execute a command on the remote host. Return a tuple containing
an integer status and a string containing all output from the command.
"""
logging.info('running:%s on %s' % (command, self.hostname))
log_fp = StringIO.StringIO()
status = 0
try:
t = self._ssh_client.exec_command(command)
except paramiko.SSHException:
logging.error("Error executing command: " + command)
status = 1
log_fp.write(t[1].read())
log_fp.write(t[2].read())
t[0].close()
t[1].close()
t[2].close()
logging.info('output: %s' % log_fp.getvalue())
return (status, log_fp.getvalue())
def run_pty(self, command):
"""
Execute a command on the remote host with a pseudo-terminal.
Returns a string containing the output of the command.
"""
logging.info('running:%s on %s' % (command, self.hostname))
channel = self._ssh_client.get_transport().open_session()
channel.get_pty()
status = 0
try:
channel.exec_command(command)
except:
logging.error("Error executing command: " + command)
status = 1
return status, channel.recv(1024)
def close(self):
transport = self._ssh_client.get_transport()
transport.close()
def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False):
logging.info('running:%s on %s' % (cmd, self.hostname))
ssh = self._ssh_client
chan = ssh.get_transport().open_session()
stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
processed_cmd = cmd
if use_sudo:
processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
chan.exec_command(processed_cmd)
result = {
'stdout': [],
'stderr': [],
}
exit_status = chan.recv_exit_status()
result['exit_status'] = exit_status
def print_output():
for line in stdout:
result['stdout'].append(line)
logging.info(line)
for line in stderr:
result['stderr'].append(line)
logging.info(line)
if verbose:
print processed_cmd
print_output()
return exit_status,result
def exec_cmd(self, cmd):
import select
ssh = self._ssh_client
channel = ssh.get_transport().open_session()
END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL"
cmd += ";echo " + END
logging.info('running:%s on %s' % (cmd, self.hostname))
channel.exec_command(cmd)
out = ""
buf = ""
while END not in buf:
rl, wl, xl = select.select([channel],[],[],0.0)
if len(rl) > 0:
# Must be stdout
buf = channel.recv(1024)
logging.info(buf)
out += buf
return 0, out
2 ответа
У меня была такая же проблема, мой скрипт на python завис, когда скрипт оболочки, который я запускал на удаленном ssh-клиенте, выполнил команду wget для файла размером 400 МБ.
Я обнаружил, что добавление тайм-аута к команде wget решило проблему. Изначально у меня было:
wget http://blah:8888/file.zip
теперь с этим:
wget -q -T90 http://blah:8888/file.zip
Отлично работает!
Надеюсь, поможет.
- В этом случае я бы пошел с добавлением списка, а затем с конкатенацией. Зачем? Ну, строки неизменны в Python. Это означает, что каждый раз, когда вы используете
+=
вы в основном создаете две новые строки и читаете третью. Если вы создаете список и добавляете его, с другой стороны, вы вдвое сокращаете число созданных строк. - Вам действительно нужно звонить в несколько раз? Насколько я понимаю, вам все равно, если процесс блокирует потоки. поскольку
select
более или менее является оболочкой для метода C с тем же именем:select () и pselect() позволяют программе контролировать несколько файловых дескрипторов, ожидая, пока один или несколько файловых дескрипторов станут "готовыми" для некоторого класса операций ввода-вывода (например, возможен ввод). Файловый дескриптор считается готовым, если возможно выполнить соответствующую операцию ввода-вывода (например, read(2)) без блокировки.
- Вы не слушаете
socket.timeout
Исключение в вашем коде. - Запись в стандартный вывод / файловую систему может быть дорогой, но вы регистрируете каждую строку, возвращаемую
recv
, Вы можете переместить строку журнала? - Рассматривали ли вы обработку чтения канала вручную? Единственный код, который вам технически необходим:
try:
out = self.in_buffer.read(nbytes, self.timeout)
except PipeTimeout, e:
# do something with error
Это не гарантировано, но это исключит дополнительную обработку.