Проблема с ручным завершением потока, доступного для записи, с { end: false } в конвейере
Поэтому ниже приведен код, использующий пользовательский поток, который расширяет поток. Преобразование из базовой библиотеки узла. Целью этого кода является чтение файла строка за строкой, каждая строка представляет собой строку, отформатированную в JSON, и выполняет некоторую обработку, чтобы проверить, является ли электронная почта новой.
"use strict";
const fs = require('fs');
const split = require('split');
const request = require('request');
const parseString = require('xml2js').parseString;
const moment = require('moment');
const LimitedParallelStream = require('./limitedParallelStream');
const ApiKey = 'xxxxxxxxxxxxxxxxxxxxx';
const ListID = 'yyyyyyyyyyyyyyyyyyyy';
const sourceReadStream = fs.createReadStream(process.argv[2]);
const resultWriteStream = fs.createWriteStream('results2.txt');
let newEmailCount = 0;
let newEmailByDay = {};
sourceReadStream
.pipe(split())
.pipe(new LimitedParallelStream(3, (entrant, enc, push, done) => {
if(!entrant) return done();
const entrantObj = JSON.parse(entrant);
const { email, entrydttm } = entrantObj;
const entryDayKey = moment(entrydttm).format('YYYY-MM-DD');
console.log(`checking ${email}...`);
const options = {
method: 'GET',
url: 'http://api.com/api/Subscribers.GetSingleSubscriber',
qs:
{
ApiKey,
ListID,
EmailAddress: email
}
};
request(options, function (err, response, body) {
if (err) throw new Error(err);
parseString(body, function (parseErr, result) {
const createdDate = moment(result.date);
let newEmail = 'NO';
if (createdDate.isSameOrAfter(entrydttm)) {
newEmailCount++;
newEmail = 'YES';
if (newEmailByDay.hasOwnProperty(entryDayKey)) {
const { [entryDayKey]: count } = newEmailByDay;
newEmailByDay = { ...newEmailByDay, [entryDayKey]: count + 1 };
} else {
newEmailByDay = { ...newEmailByDay, [entryDayKey]: 1 };
}
}
push(`${email} - entrydttm: ${entrydttm}, createdttm: ${result.anyType.Date[0]}, new: ${newEmail}\n`);
done();
});
});
}))
.pipe(resultWriteStream) // { end: false }
.on('finish', () => {
console.log(`All emails were checked - total new emails: ${newEmailCount}`);
console.log(`New emails by day: ${JSON.stringify(newEmailByDay, null, 2)}`);
})
;
Это прекрасно работает, если я просто консоль записываю окончательный результат в обратный вызов 'finish'.
Однако, когда я попытался использовать {end: false} в канале к resultWriteStream, а затем вручную завершить поток записи, когда поток чтения заканчивается, поток чтения заканчивается до того, как все содержимое будет прочитано.
Sth, как это:
sourceReadStream
.on('end', () => resultWriteStream.end(`All emails were checked - total new emails: ${newEmailCount}`));
LimitedParallelStream.js выглядит так:
"use strict";
const stream = require('stream');
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) {
super({objectMode: true});
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
this.continueCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
if(this.running < this.concurrency) {
done();
} else {
this.continueCallback = done;
}
}
_flush(done) {
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback();
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = LimitedParallelStream;
Файл для чтения выглядит примерно так, но имеет несколько сотен строк:
{"email": "joe.doe@hotmail.com", "entrydttm": "2019-01-16 14:09:07"} {"email": "bill.gee@gmail.com", "entrydttm": "2019-01-16 13:53:17"}
До сих пор я не понял, как правильно разрешить мне записать окончательный результат в один и тот же файл после того, как он фактически завершил обработку всех строк.
Любая помощь будет высоко ценится!