Paramiko зависает при выполнении большой команды wget

Привет у меня проблемы с выполнением команды, которая выполняет Wget файла 100 МБ на сервере Ubuntu 10. Более короткие команды работают хорошо, за исключением этого. Приведенный ниже класс содержит информацию о том, как я использую paramiko и мои различные попытки преодоления этой проблемы (см. Различные методы run или exec). В случае exec_cmd выполнение зависает на этой строке:

        out = self.in_buffer.read(nbytes, self.timeout)

из метода recv модуля channel.py от paramiko.

Эта же команда wget отлично работает в оболочке, используя обычную утилиту ssh от Mac.

"""
Management of SSH connections
"""

import logging
import os
import paramiko
import socket
import time
import StringIO


class SSHClient():
    def __init__(self):
        self._ssh_client = paramiko.SSHClient()
        self._ssh_client.load_system_host_keys()
        self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.time_out = 300
        self.wait = 5

    def connect(self, hostname, user, pkey):
        retry = self.time_out
        self.hostname = hostname
        logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey))
        while retry > 0:
            try:
                self._ssh_client.connect(hostname,
                                         username=user,
                                         key_filename=os.path.expanduser(pkey),
                                         timeout=self.time_out)
                return
            except socket.error, (value,message):
                if value == 61 or value == 111:
                    logging.warning('SSH Connection refused, will retry in 5 seconds')
                    time.sleep(self.wait)
                    retry -= self.wait
                else:
                    raise
            except paramiko.BadHostKeyException:
                logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname)
                logging.warning('Edit that file to remove the entry and then try again')
                retry = 0
            except EOFError:
                logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds')
                time.sleep(self.wait)
                retry -= self.wait
        logging.error('Could not establish SSH connection')

    def exists(self, path):
        status = self.run('[ -a %s ] || echo "FALSE"' % path)
        if status[1].startswith('FALSE'):
            return 0
        return 1

    def shell(self):
        """
        Start an interactive shell session on the remote host.
        """
        channel = self._ssh_client.invoke_shell()
        interactive_shell(channel)

    def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a string containing all output from the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        log_fp = StringIO.StringIO()
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            logging.error("Error executing command: " + command)
            status = 1
        log_fp.write(t[1].read())
        log_fp.write(t[2].read())
        t[0].close()
        t[1].close()
        t[2].close()
        logging.info('output: %s' % log_fp.getvalue())
        return (status, log_fp.getvalue())

    def run_pty(self, command):
        """
        Execute a command on the remote host with a pseudo-terminal.
        Returns a string containing the output of the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        channel = self._ssh_client.get_transport().open_session()
        channel.get_pty()
        status = 0
        try:
            channel.exec_command(command)
        except:
            logging.error("Error executing command: " + command)
            status = 1
        return status, channel.recv(1024)

    def close(self):
        transport = self._ssh_client.get_transport()
        transport.close()

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False):
        logging.info('running:%s on %s' % (cmd, self.hostname))
        ssh = self._ssh_client
        chan = ssh.get_transport().open_session()
        stdin = chan.makefile('wb')
        stdout = chan.makefile('rb')
        stderr = chan.makefile_stderr('rb')
        processed_cmd = cmd
        if use_sudo:
            processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
        chan.exec_command(processed_cmd)
        result = {
            'stdout': [],
            'stderr': [],
        }
        exit_status = chan.recv_exit_status()
        result['exit_status'] = exit_status

        def print_output():
            for line in stdout:
                result['stdout'].append(line)
                logging.info(line)
            for line in stderr:
                result['stderr'].append(line)
                logging.info(line)
        if verbose:
            print processed_cmd
            print_output()
        return exit_status,result 

    def exec_cmd(self, cmd):
        import select
        ssh = self._ssh_client
        channel = ssh.get_transport().open_session()
        END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL"
        cmd += ";echo " + END
        logging.info('running:%s on %s' % (cmd, self.hostname))
        channel.exec_command(cmd)
        out = ""
        buf = ""
        while END not in buf:
          rl, wl, xl = select.select([channel],[],[],0.0)
          if len(rl) > 0:
              # Must be stdout
              buf = channel.recv(1024)
              logging.info(buf)
              out += buf
        return 0, out

2 ответа

У меня была такая же проблема, мой скрипт на python завис, когда скрипт оболочки, который я запускал на удаленном ssh-клиенте, выполнил команду wget для файла размером 400 МБ.

Я обнаружил, что добавление тайм-аута к команде wget решило проблему. Изначально у меня было:

wget http://blah:8888/file.zip

теперь с этим:

wget -q -T90 http://blah:8888/file.zip

Отлично работает!

Надеюсь, поможет.

  1. В этом случае я бы пошел с добавлением списка, а затем с конкатенацией. Зачем? Ну, строки неизменны в Python. Это означает, что каждый раз, когда вы используете += вы в основном создаете две новые строки и читаете третью. Если вы создаете список и добавляете его, с другой стороны, вы вдвое сокращаете число созданных строк.
  2. Вам действительно нужно звонить в несколько раз? Насколько я понимаю, вам все равно, если процесс блокирует потоки. поскольку select более или менее является оболочкой для метода C с тем же именем:

    select () и pselect() позволяют программе контролировать несколько файловых дескрипторов, ожидая, пока один или несколько файловых дескрипторов станут "готовыми" для некоторого класса операций ввода-вывода (например, возможен ввод). Файловый дескриптор считается готовым, если возможно выполнить соответствующую операцию ввода-вывода (например, read(2)) без блокировки.

  3. Вы не слушаете socket.timeout Исключение в вашем коде.
  4. Запись в стандартный вывод / файловую систему может быть дорогой, но вы регистрируете каждую строку, возвращаемую recv, Вы можете переместить строку журнала?
  5. Рассматривали ли вы обработку чтения канала вручную? Единственный код, который вам технически необходим:
try:
    out = self.in_buffer.read(nbytes, self.timeout)
except PipeTimeout, e:
    # do something with error

Это не гарантировано, но это исключит дополнительную обработку.

Другие вопросы по тегам