Форк Процесс / Чтение Запись через канал МЕДЛЕННО

ОТВЕТ

/questions/2621203/fork-protsess-chtenie-zapis-cherez-kanal-medlenno/2621223#2621223

это было так тривиально.. аргументы! но много хорошей информации получено. Спасибо всем.

РЕДАКТИРОВАТЬ

ссылка на github: https://github.com/MarkusPfundstein/stream_lame_testing

ОРИГИНАЛЬНАЯ ПОЧТА

У меня есть несколько вопросов, касающихся IPC через конвейеры. Моя цель состоит в том, чтобы получать данные MP3 для каждого потока TCP/IP, передавать их через LAME, чтобы декодировать их в wav, выполнять некоторые математические операции и сохранять их на диске (как wav). Я использую неблокирующие IO для всего этого. Что меня немного раздражает, так это то, что чтение tcp/ip происходит намного быстрее, чем через лабиринт трубопровода. Когда я посылаю ~3 МБ mp3, файл читается на стороне клиента через пару секунд. В начале я также могу записать в stdin процесса lame, затем он перестает писать, он читает остальную часть mp3 и, если он закончен, я могу снова написать в lame. 4096 байт занимают примерно 1 секунду (чтобы писать и читать с хромого). Это довольно медленно, потому что я хочу декодировать мой wav min 128kbs.

ОС является ядром Debian 2.6 на этом микрокомпьютере:

https://www.olimex.com/dev/imx233-olinuxino-maxi.html

65 МБ ОЗУ 400 МГц

ulimit -n | grep pipe возвращает 512 x 8, значит 4096, что нормально. Это 32-битная система.

Странная вещь в том, что

my_process | lame --decode --mp3input - output.wav

идет очень быстро

Вот мой код fork_lame (который обязательно должен связать весь процесс с stdin of lame и наоборот)

static char * const k_lame_args[] = {
  "--decode",
  "--mp3input",
  "-",
  "-",
  NULL
};

static int
fork_lame()
{
  int outfd[2];
  int infd[2];
  int npid;
  pipe(outfd); /* Where the parent is going to write to */
  pipe(infd); /* From where parent is going to read */
  npid = fork();
  if (npid == 0) {
    close(STDOUT_FILENO);
    close(STDIN_FILENO);
    dup2(outfd[0], STDIN_FILENO);
    dup2(infd[1], STDOUT_FILENO);
    close(outfd[0]); /* Not required for the child */
    close(outfd[1]);
    close(infd[0]);
    close(infd[1]);
    if (execv("/usr/local/bin/lame", k_lame_args) == -1) {
      perror("execv");
      return 1;
    }
  } else {
    s_lame_pid = npid;
    close(outfd[0]); /* These are being used by the child */
    close(infd[1]);
    s_lame_fds[WRITE] = outfd[1];
    s_lame_fds[READ] = infd[0];
  }
  return 0;
}

Это функции чтения и записи. Пожалуйста, не то, что в write_lame_in. когда я пишу в stderr вместо s_lame_fds[WRITE], вывод получается почти мгновенно, так что это определенно труба через lame. Но почему?

static int
read_lame_out() 
{
  char buffer[READ_SIZE];
  memset(buffer, 0, sizeof(buffer));
  int i;
  int br = read(s_lame_fds[READ], buffer, sizeof(buffer) - 1);
  fprintf(stderr, "read %d bytes from lame out\n", br);
  return br;
}

static int
write_lame_in()
{
  int bytes_written;
  //bytes_written = write(2, s_data_buf, s_data_len);
  bytes_written = write(s_lame_fds[WRITE], s_data_buf, s_data_len);
  if (bytes_written > 0) {
    //fprintf(stderr, "%d bytes written\n", bytes_written);
    s_data_len -= bytes_written;
    fprintf(stderr, "data_len write: %d\n", s_data_len);
    memmove(s_data_buf, s_data_buf + bytes_written, s_data_len);
    if (s_data_len == 0) {
      fprintf(stderr, "finished\n");
    }
  } 

  return bytes_written;
}

static int
read_tcp_socket(struct connection_s *connection)
{
  char buffer[READ_SIZE];
  int bytes_read;
  bytes_read = connection_read(connection, buffer, sizeof(buffer)-1);
  if (bytes_read > 0) {
    //fprintf(stderr, "read %d bytes\n", bytes_read);
    if (s_data_len + bytes_read > sizeof(s_data_buf)) {
      fprintf(stderr, "BUFFER OVERFLOW\n");
      return -1;
    } else {
      memcpy(s_data_buf + s_data_len,
             buffer,
             bytes_read);
      s_data_len += bytes_read;
    }
    fprintf(stderr, "data_len: %d\n", s_data_len);
  }
  return bytes_read;
}

Материал выбора - довольно простая логика выбора. Все блоки не блокируются, конечно.

Кто-нибудь есть идеи? Буду очень признателен за любую помощь;-)

3 ответа

Решение

К сожалению! Вы проверили выход LAME?

Глядя на ваш код, в частности

static char * const k_lame_args[] = {
  "--decode",
  "--mp3input",
  "-",
  "-",
  NULL
};

а также

if (execv("/usr/local/bin/lame", k_lame_args) == -1) {

означает, что вы случайно опускаете --decode пометить как будет argv[0] для LAME вместо первого аргумента (argv[1]). Вы должны использовать

static char * const k_lame_args[] = {
  /* argv[0] */  "lame",
  /* argv[1] */  "--decode",
  /* argv[2] */  "--mp3input",
  /* argv[3] */  "-",
  /* argv[4] */  "-",
                 NULL
};

вместо.

Я думаю, что вы видите замедление, потому что вы случайно сжимаете аудио MP3. (Я заметил это всего минуту назад, поэтому не проверял, делает ли LAME это, если вы пропустите --decode флаг, но я верю, что это так.)

Возможно, есть какая-то проблема с блокировкой. неблокирующие каналы (на самом деле не являющиеся неблокирующими), в результате чего ваш конец блокируется до тех пор, пока LAME не использует данные.

Не могли бы вы попробовать альтернативный подход? Используйте обычные, блокирующие трубы и отдельную резьбу (используя pthreads), который имеет единственную цель записи данных из кольцевого буфера в LAME. Затем ваш основной поток продолжает заполнять кольцевой буфер из вашего соединения TCP/IP и может также легко отслеживать и сообщать об уровнях буфера, что очень полезно при разработке и отладке. У меня был гораздо больший успех с блокировкой труб и резьб, чем вообще без блокировок.

В Linux потоки на самом деле не несут такой большой нагрузки, поэтому вам должно быть удобно использовать их даже на встроенных архитектурах. Единственный прием, который вы должны освоить, - это указать разумный размер стека для рабочего потока - в этом случае вполне вероятно, что 16384 байта достаточно - потому что только начальный стек, данный процессу, будет автоматически увеличиваться, а стеки потоков по умолчанию исправляются довольно большой.

Вам нужен пример кода?

Отредактировано, чтобы добавить:

Ваша программа получает данные из соединения TCP/IP, вероятно, с постоянной скоростью. Однако LAME потребляет данные большими порциями. Другими словами, ситуация похожа на буксируемую машину, когда буксирующая машина дергается и останавливается, и каждый раз рывок дергается в нее: и ваш процесс, и LAME большую часть времени ждут, пока другой получит / отправит больше данных.

Во-первых, эти два закрытия не требуются (на самом деле, вы не должны этого делать), потому что два dup2 следующий будет делать это автоматически:

close(STDOUT_FILENO);
close(STDIN_FILENO);
Другие вопросы по тегам