Как обрабатывать ошибку из fs readline.Interface async iterator
На примере processLineByLine() я заметил, что мы не можем отловить ошибку, если данное имя файла не существует. В этом случае программа завершается чем-то вроде:
UnhandledPromiseRejectionWarning: Ошибка: ENOENT: нет такого файла или каталога
Итак, самый простой подход, который я использовал, чтобы вызвать обнаруживаемую ошибку, состоял в том, чтобы внести две модификации в processLineByLine()
функция:
- превратить его в генератор, такой как
function*
await
проверка наличия файлаawait access(filename, fs.constants.F_OK)
Наконец мне пришлось преобразовать readline.Interface
экземпляр к асинхронному генератору. Последняя часть мне особенно не нравится. Результирующийlines()
функция похожа на:
export async function* lines(filename) {
await access(filename, fs.constants.F_OK)
const lines = readline.createInterface({
input: fs.createReadStream(filename),
crlfDelay: Infinity
})
for await (const l of lines) {
yield l
}
}
Вопрос: есть ли лучший способ сделатьlines()
либо вернуть асинхронный итератор, либо выдать ошибку, если имя файла не существует?
Отчет об ошибке : Что касается наблюдений @jfriend00, я открыл проблему с ошибкой на nodejs: https://github.com/nodejs/node/issues/30831
3 ответа
Хм, это непросто. Даже определение того, существует ли файл в качестве предполетного, не гарантирует, что вы сможете его успешно открыть (он может быть заблокирован или иметь проблемы с разрешениями), а определение того, существует ли он перед открытием, является классическим состоянием гонки при разработке сервера (небольшое окно, но все равно состояние гонки).
Я все еще думаю, что должен быть лучший способ избавиться от ошибки fs.createReadStream()
, но единственный способ, который я смог найти, - это заключить его в обещание, которое разрешается только после успешного открытия файла. Это позволяет получить ошибку при открытии файла и передать ее обратно вызывающей стороне вашегоasync
функция. Вот как это будет выглядеть:
const fs = require('fs');
const readline = require('readline');
function createReadStreamSafe(filename, options) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(filename, options);
fileStream.on('error', reject).on('open', () => {
resolve(filestream);
});
});
}
async function processLineByLine(f) {
const fileStream = await createReadStreamSafe(f);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine("nofile").catch(err => {
console.log("caught error");
});
Это делает так, что обещание, что processLineByLine()
возврат будет отклонен, и вы сможете обработать ошибку, о которой, я думаю, вы просили. Если я неправильно понял, о чем вы просили, поясните, пожалуйста.
К вашему сведению, мне кажется, что это ошибка в readline.createInterface()
потому что кажется, что он должен отклонить на первой итерации for await (const line of rl)
, но, похоже, этого не происходит.
Таким образом, как следствие этого, даже этот обходной путь не обнаружит ошибок чтения в потоке после его открытия. Это действительно нужно исправить внутриcreateInterface()
. Я согласен, что ошибка открытия файла или ошибка чтения должны отображаться как отклонение наfor await (const line of rl)
.
Другой способ решения проблемы с открытием файла - это предварительно открыть файл с помощью await fs.promises.open(...)
и пройти fd
к fs.createReadStream
и тогда вы сами увидите ошибку при открытии.
Другое решение - упаковка итератора readLine для добавления обработки ошибок
Предупреждение, это в конечном итоге выглядит как взлом, но это действительно интересный учебный проект, потому что мне пришлось обернуть строку чтения asyncIterator
с моим, чтобы отклонить, когда я обнаружил ошибку на readStream
(обработка ошибок, readline
библиотека отсутствует).
Я отправился на миссию, чтобы выяснить, как написать processLineByLine()
функция, которая вернет asyncIterator
который будет правильно отклонять ошибки потока (даже если readline
код имеет ошибки в этом отношении), все еще используя внутреннюю библиотеку readline.
Целью было написать такой код:
for await (let line of processLineByLine("somefile1.txt")) {
console.log(line);
}
который правильно обрабатывает ошибки в readStream, используемом внутри, независимо от того, не существует ли файл, существует, но не может быть открыт или даже обнаруживает ошибку чтения позже при чтении. Поскольку я не меняю / не исправляю код интерфейса readline внутри, мне пришлось установить свой собственныйerror
слушатель в readStream, и когда я вижу там ошибку, мне нужно вызвать отклонение любых ожидающих или будущих обещаний из интерфейса readline.
Вот что у меня получилось:
// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream. It's really
// ugly, but does work.
const fs = require('fs');
const readline = require('readline');
function processLineByLine(filename, options = {}) {
const fileStream = fs.createReadStream(filename, options);
let latchedError = null;
let kill = new Set();
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
const lines = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.add(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then((data => {
// since we're resolving now, let's removing our reject
// handler from the kill storage. This will allow this scope
// to be properly garbage collected
kill.delete(reject);
resolve(data);
}), reject);
});
}
}
}
}
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
return asyncIterable;
}
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
Некоторое объяснение того, как это работает...
Собственный мониторинг ошибок в потоке
Во-первых, вы можете увидеть это:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
Это наш собственный мониторинг ошибок в readStream, чтобы восполнить недостающую обработку ошибок внутри readline. Каждый раз, когда мы видим ошибку, мы сохраняем ее в переменной с более высокой областью видимости для возможного дальнейшего использования, и, если есть какие-либо ожидающие обещания, зарегистрированные из строки чтения для этого потока, мы "убиваем" их (что отклоняет их, позже вы увидите, как это работает).
Нет специальной обработки ошибок открытия файлов
Частично цель заключалась в том, чтобы избавиться от специальной обработки в предыдущем решении для ошибок открытия файлов. Мы хотим, чтобы ЛЮБАЯ ошибка в readStream вызывала отклонение asyncIterable, поэтому это гораздо более универсальный механизм. ошибка открытия файла попадает в эту обработку ошибок точно так же, как и любая другая ошибка чтения.
Наши собственные asyncIterable и asyncIterator
Вызов readline.createInterace()
возвращает asyncIterable. Это в основном то же самое, что и обычная итерация, в том, что вы вызываете у нее специальное свойство, чтобы получитьasyncIterator
. КоторыйasyncIterator
имеет .next()
свойство на нем так же, как и обычный итератор, за исключением случаев, когда asyncIterator.next()
вызывается, он возвращает обещание, которое разрешается в объект, а не в объект.
Итак, вот как for await (let line of lines)
работает. Сначала звонитlines[Symbol.asyncIterator]()
чтобы получить asyncIterator. Затем на этомasyncIterator
что он возвращается, он неоднократно делает await asyncIterator.next()
ожидая обещания, что asyncIterator.next()
возвращается.
Сейчас, readline.createInterface()
уже возвращает такой asyncIterable
. Но это работает не совсем правильно. КогдаreadStream
получает ошибку, он не отклоняет обещание, возвращенное .next()
на каждой итерации. Фактически, это обещание никогда не отклоняется и не выполняется. Итак, все застопорилось. В моем тестовом приложении приложение просто выйдет, потому что readStream был выполнен (после ошибки), и больше не было ничего, что удерживало бы приложение от выхода, даже если обещание все еще находилось в ожидании.
Итак, мне нужен был способ заставить это обещание, что readlineIterator.next()
ранее вернулся и в настоящее время ожидает for await (...)
быть отвергнутым. Что ж, обещание не предоставляет внешний интерфейс для его отклонения, и у нас нет доступа к внутренним компонентамreadline
реализация, где есть возможность отклонить ее.
Мое решение заключалось в том, чтобы обернуть readlineIterator моим собственным как своего рода прокси. Затем мы, мой собственный детектор ошибок, обнаруживаем ошибку и есть невыполненные обещания из строки чтения, я могу использовать свой прокси / оболочку, чтобы принудительно отклонить эти невыполненные обещания. Это вызоветfor await (...)
чтобы увидеть отклонение и получить правильную ошибку. И это работает.
Мне потребовалось время, чтобы узнать достаточно о том, как asyncIterators
работать, чтобы можно было обернуть один. Я очень благодарен этой статье об асинхронных итераторах в JavaScript, в которой представлены очень полезные примеры кода для создания собственных asyncIterable и asyncIterator. На самом деле именно здесь и произошло настоящее обучение в этом упражнении, и где другие могут узнать, поняв, как это работает в приведенном выше коде.
Принудительное отклонение завернутого обещания
"Уродство" в этом коде заключается в том, что обещание принудительно отклоняется извне за пределы обычной области действия обработчика отклонения этого обещания. Это достигается путем сохранения обработчика отклонения в области более высокого уровня, где выполняется обработка ошибок дляreadStream
может вызвать триггер, который обещает отклонить. Может быть более элегантный способ кодирования этого, но он работает.
Делаем наш собственный asyncIterable
Асинхронный итерабельный объект - это просто объект, имеющий одно свойство с именем [Symbol.asyncIterator]
. Это свойство должно быть функцией, которая при вызове без аргументов возвращаетasyncIterator.
Итак, вот наши asyncIterable
.
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
Создание собственного asyncIterator
An asyncIterator
это функция, которая при вызове возвращает объект с next()
собственность на нем. Каждый разobj.next()
вызывается, он возвращает обещание, которое преобразуется в обычный объект кортежа итератора {done, value}
. Нам не нужно беспокоиться о разрешенном значении, потому что мы просто получим его из итератора строки чтения. Итак, вот нашиasyncIterator
:
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.push(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then(resolve, reject);
});
}
}
}
}
Во-первых, он получает asyncIterator из интерфейса readline (тот, который мы проксируем / упаковываем) и сохраняет его локально в области видимости, чтобы мы могли использовать его позже.
Затем он возвращает обязательную структуру итератора в форме {next: fn}
. Затем внутри этой функции разворачивается наша логика упаковки. Если мы видели ранее зафиксированную ошибку, мы просто всегда возвращаемPromise.reject(latchedError);
. Если ошибки нет, мы возвращаем созданное вручную обещание.
Внутри функции исполнителя для этого обещания мы регистрируем нашу обработку отклонения, добавляя ее в более высокую область видимости. Set
названный kill
. Это позволяет намfilestream.on('error', ....)
обработчик, чтобы отклонить это обещание, если он обнаружит ошибку, вызвав эту функцию.
Затем мы звоним linesIterator.next()
чтобы получить обещание, что он вернется. Мы регистрируем интерес как к обратным вызовам разрешения, так и к отклонению этого обещания. Если это обещание правильно разрешено, мы удаляем наш обработчик отклонения из области более высокого уровня (чтобы обеспечить лучшую сборку мусора в нашей области), а затем разрешаем наше обещание обертывания / прокси с тем же разрешенным значением.
Если это обещание linesIterator отклоняется, мы просто передаем отклонение прямо через наше обещание переноса / прокси.
Наша собственная обработка ошибок файлового потока
Итак, теперь последнее объяснение. У нас есть этот обработчик ошибок, просматривающий поток:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
Это делает две вещи. Во-первых, он сохраняет / фиксирует ошибку, поэтому любые будущие вызовы итератора строк будут просто отклоняться с этой предыдущей ошибкой. Во-вторых, если есть какие-либо ожидающие выполнения обещания от итератора строк, ожидающие разрешения, он циклически проходит черезkill
Устанавливайте и отвергайте эти обещания. Это то, что заставляет asyncIterator должным образом отклонять обещание. Это должно происходить внутриreadline
код, но поскольку он делает это неправильно, мы принудительно отклоняем наше обещание обертывания / прокси, чтобы вызывающий видел правильное отклонение, когда поток получает ошибку.
В конце концов, вы можете просто сделать это, поскольку все уродливые детали скрыты за завернутой asyncIterable
:
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
Обработка ошибок строки чтения
Я также изо всех сил пытался получить readline, чтобы иметь возможность выдавать любые состояния ошибки. Мне удалось заставить readstream выдавать ошибки, если файл не найден. (показано в коде ниже) Но writestream никогда этого не делает.
Мой вариант использования заключался в том, что я не хотел использовать много памяти для чтения всего файла, а затем для преобразования из JSON в объект. Просто построчно - отсюда и readline. Как я теперь вижу, Node.js проделал некоторую работу над версией 19.xxx? но он еще не готов к производству (по состоянию на 25 октября 2022 г.)
Я попытался преобразовать readline в обещание, но в текущей версии у него много обручей.
Это рабочая оболочка, которая показывает мою структуру, если это помогает другим. Требуется больше работы, чтобы сделать его асинхронным/ожидающим, но здесь требуется большая осторожность, чтобы избежать условий гонки чтения или записи.
////////////////////////////////////////////////////////////////////////
// Simple file stream that can be used to find/edit/remove/filter data
// Example; User name password email
////////////////////////////////////////////////////////////////////////
const readline = require('readline');
const fs = require('fs');
Let obj={};
// A file called json_users.txt it has JSON strings terminated with line feed
//{"id":1,"username":"","password":"","email":""}\n
const readStream2 = fs.createReadStream( "json_users.txt" );
const writeStream2 = fs.createWriteStream( "update_users.txt", { encoding: "utf8"} );
// Some sort of Read error handler - works if no file
readStream2.on('error', function (err) {
console.log("This is a read stream error "+ err);
});
// Some sort of Write error handler -
// but never called even with a file name like "$$--!!.$$$"
writeStream2.on('error', function (err) {
console.log("This is a write stream error "+ err);
});
// Create readline with input read stream and output write stream
const rl = readline.createInterface({
input: readStream2,
output: writeStream2,
terminal: false,
crlfDelay: Infinity,
historySize: 0
});
// readline is event driven on line feed
rl.on('line', (line) => {
obj =JSON.parse(line); // convert line into an object
// Any Filter work goes here e.g. Remove user - find user edit user
if(obj.id==20) { // test if id=20 make username ="
obj.username="Douglas Crockford";
}
// Write object and \n back to stream
writeStream2.write(JSON.stringify(obj)+'\n');
});
// much better way to close stream do this but for now
// await new Promise((res) => rl.once('close', res));
rl.once("close",()=>{
console.log("done");
rl.close;
writeStream2.close;
});
Вот как я справился с этим:
new Promise(async (resolve, reject) => {
const input = fs.createReadStream(o.file)
input.on('error', (err: any) => {
// Handle errors here
reject(new Error(err.stack))
})
const crlfDelay = Infinity
const rl = readline.createInterface({ input, crlfDelay })
rl.on('line', (line) => {
// Handle lines here
})
await events.once(rl, 'close')
resolve(/* some value */)
})
К сожалению, у меня нет времени просматривать весь ответ @jfriend00, но я прочитал первый фрагмент кода, которым он поделился, и он у меня не работает, потому что:
событие запускается до события. Я не знаю, делаю ли я что-то не так, потому что это кажется странным.
'error'
происходит после _
'open'
.
Тем не менее, кажется , что код, который я написал, работает, но ни в коем случае не считайте его готовым к работе (по крайней мере, пока не пройдет время, и я не найду с ним никаких проблем).
Из того, что я вижу, узел 16 добавил асинхронный метод,
createReadStream
, к
FileHandle
класс (https://nodejs.org/docs/latest-v16.x/api/fs.html#filehandlecreatereadstreamoptions).
Так что я предполагаю, что вы могли бы просто использовать этот метод,
await
это, и (я полагаю) вы получите хорошую ошибку, если это не удастся, но у меня нет времени, чтобы попробовать это, если кто-то хочет попробовать, сделайте это и добавьте комментарий/ответ об этом.
Имейте в виду, что этот код, конечно, будет ограничен узлом 16 и выше, в то время как приведенный выше код работает для узла 14.