Форк Процесс / Чтение Запись через канал МЕДЛЕННО
ОТВЕТ
/questions/2621203/fork-protsess-chtenie-zapis-cherez-kanal-medlenno/2621223#2621223
это было так тривиально.. аргументы! но много хорошей информации получено. Спасибо всем.
РЕДАКТИРОВАТЬ
ссылка на github: https://github.com/MarkusPfundstein/stream_lame_testing
ОРИГИНАЛЬНАЯ ПОЧТА
У меня есть несколько вопросов, касающихся IPC через конвейеры. Моя цель состоит в том, чтобы получать данные MP3 для каждого потока TCP/IP, передавать их через LAME, чтобы декодировать их в wav, выполнять некоторые математические операции и сохранять их на диске (как wav). Я использую неблокирующие IO для всего этого. Что меня немного раздражает, так это то, что чтение tcp/ip происходит намного быстрее, чем через лабиринт трубопровода. Когда я посылаю ~3 МБ mp3, файл читается на стороне клиента через пару секунд. В начале я также могу записать в stdin процесса lame, затем он перестает писать, он читает остальную часть mp3 и, если он закончен, я могу снова написать в lame. 4096 байт занимают примерно 1 секунду (чтобы писать и читать с хромого). Это довольно медленно, потому что я хочу декодировать мой wav min 128kbs.
ОС является ядром Debian 2.6 на этом микрокомпьютере:
https://www.olimex.com/dev/imx233-olinuxino-maxi.html
65 МБ ОЗУ 400 МГц
ulimit -n | grep pipe возвращает 512 x 8, значит 4096, что нормально. Это 32-битная система.
Странная вещь в том, что
my_process | lame --decode --mp3input - output.wav
идет очень быстро
Вот мой код fork_lame (который обязательно должен связать весь процесс с stdin of lame и наоборот)
static char * const k_lame_args[] = {
"--decode",
"--mp3input",
"-",
"-",
NULL
};
static int
fork_lame()
{
int outfd[2];
int infd[2];
int npid;
pipe(outfd); /* Where the parent is going to write to */
pipe(infd); /* From where parent is going to read */
npid = fork();
if (npid == 0) {
close(STDOUT_FILENO);
close(STDIN_FILENO);
dup2(outfd[0], STDIN_FILENO);
dup2(infd[1], STDOUT_FILENO);
close(outfd[0]); /* Not required for the child */
close(outfd[1]);
close(infd[0]);
close(infd[1]);
if (execv("/usr/local/bin/lame", k_lame_args) == -1) {
perror("execv");
return 1;
}
} else {
s_lame_pid = npid;
close(outfd[0]); /* These are being used by the child */
close(infd[1]);
s_lame_fds[WRITE] = outfd[1];
s_lame_fds[READ] = infd[0];
}
return 0;
}
Это функции чтения и записи. Пожалуйста, не то, что в write_lame_in. когда я пишу в stderr вместо s_lame_fds[WRITE], вывод получается почти мгновенно, так что это определенно труба через lame. Но почему?
static int
read_lame_out()
{
char buffer[READ_SIZE];
memset(buffer, 0, sizeof(buffer));
int i;
int br = read(s_lame_fds[READ], buffer, sizeof(buffer) - 1);
fprintf(stderr, "read %d bytes from lame out\n", br);
return br;
}
static int
write_lame_in()
{
int bytes_written;
//bytes_written = write(2, s_data_buf, s_data_len);
bytes_written = write(s_lame_fds[WRITE], s_data_buf, s_data_len);
if (bytes_written > 0) {
//fprintf(stderr, "%d bytes written\n", bytes_written);
s_data_len -= bytes_written;
fprintf(stderr, "data_len write: %d\n", s_data_len);
memmove(s_data_buf, s_data_buf + bytes_written, s_data_len);
if (s_data_len == 0) {
fprintf(stderr, "finished\n");
}
}
return bytes_written;
}
static int
read_tcp_socket(struct connection_s *connection)
{
char buffer[READ_SIZE];
int bytes_read;
bytes_read = connection_read(connection, buffer, sizeof(buffer)-1);
if (bytes_read > 0) {
//fprintf(stderr, "read %d bytes\n", bytes_read);
if (s_data_len + bytes_read > sizeof(s_data_buf)) {
fprintf(stderr, "BUFFER OVERFLOW\n");
return -1;
} else {
memcpy(s_data_buf + s_data_len,
buffer,
bytes_read);
s_data_len += bytes_read;
}
fprintf(stderr, "data_len: %d\n", s_data_len);
}
return bytes_read;
}
Материал выбора - довольно простая логика выбора. Все блоки не блокируются, конечно.
Кто-нибудь есть идеи? Буду очень признателен за любую помощь;-)
3 ответа
К сожалению! Вы проверили выход LAME?
Глядя на ваш код, в частности
static char * const k_lame_args[] = {
"--decode",
"--mp3input",
"-",
"-",
NULL
};
а также
if (execv("/usr/local/bin/lame", k_lame_args) == -1) {
означает, что вы случайно опускаете --decode
пометить как будет argv[0]
для LAME вместо первого аргумента (argv[1]
). Вы должны использовать
static char * const k_lame_args[] = {
/* argv[0] */ "lame",
/* argv[1] */ "--decode",
/* argv[2] */ "--mp3input",
/* argv[3] */ "-",
/* argv[4] */ "-",
NULL
};
вместо.
Я думаю, что вы видите замедление, потому что вы случайно сжимаете аудио MP3. (Я заметил это всего минуту назад, поэтому не проверял, делает ли LAME это, если вы пропустите --decode
флаг, но я верю, что это так.)
Возможно, есть какая-то проблема с блокировкой. неблокирующие каналы (на самом деле не являющиеся неблокирующими), в результате чего ваш конец блокируется до тех пор, пока LAME не использует данные.
Не могли бы вы попробовать альтернативный подход? Используйте обычные, блокирующие трубы и отдельную резьбу (используя pthreads
), который имеет единственную цель записи данных из кольцевого буфера в LAME. Затем ваш основной поток продолжает заполнять кольцевой буфер из вашего соединения TCP/IP и может также легко отслеживать и сообщать об уровнях буфера, что очень полезно при разработке и отладке. У меня был гораздо больший успех с блокировкой труб и резьб, чем вообще без блокировок.
В Linux потоки на самом деле не несут такой большой нагрузки, поэтому вам должно быть удобно использовать их даже на встроенных архитектурах. Единственный прием, который вы должны освоить, - это указать разумный размер стека для рабочего потока - в этом случае вполне вероятно, что 16384 байта достаточно - потому что только начальный стек, данный процессу, будет автоматически увеличиваться, а стеки потоков по умолчанию исправляются довольно большой.
Вам нужен пример кода?
Отредактировано, чтобы добавить:
Ваша программа получает данные из соединения TCP/IP, вероятно, с постоянной скоростью. Однако LAME потребляет данные большими порциями. Другими словами, ситуация похожа на буксируемую машину, когда буксирующая машина дергается и останавливается, и каждый раз рывок дергается в нее: и ваш процесс, и LAME большую часть времени ждут, пока другой получит / отправит больше данных.
Во-первых, эти два закрытия не требуются (на самом деле, вы не должны этого делать), потому что два dup2
следующий будет делать это автоматически:
close(STDOUT_FILENO);
close(STDIN_FILENO);