Процесс Node.js завершается без ошибок (используя потоки)
Я пишу лямбда-функцию, которая получает список текстовых файлов на S3, объединяет их вместе, а затем архивирует полученный файл. По какой-то причине, функция запускается в середине процесса, без ошибок.
Полезная нагрузка, отправляемая в функцию Lambda, выглядит следующим образом:
{
"sourceFiles": [
"s3://bucket/largefile1.txt",
"s3://bucket/largefile2.txt"
],
"destinationFile": "s3://bucket/concat.zip",
"compress": true,
"omitHeader": false,
"preserveSourceFiles": true
}
Сценарии, в которых эта функция работает совершенно нормально:
- Два файла маленькие, и
compress === false
- Два файла маленькие, и
compress === true
- Два файла большие, и
compress === false
Если я пытаюсь сжать два больших файла, он выходит посередине. Сам процесс конкатенации работает нормально, но когда он пытается использовать zip-stream
добавить поток в архив не получается.
Два больших файла вместе составляют 483 833 байта. Когда лямбда-функция завершается ошибкой, она считывает либо 290,229, либо 306,589 байт (это случайно), а затем завершает работу.
Это главная точка входа в функцию:
const packer = require('zip-stream');
const S3 = require('aws-sdk/clients/s3');
const s3 = new S3({ apiVersion: '2006-03-01' });
const { concatCsvFiles } = require('./csv');
const { s3UrlToParts } = require('./utils');
function addToZip(archive, stream, options) {
return new Promise((resolve, reject) => {
archive.entry(stream, options, (err, entry) => {
console.log('entry done', entry);
if (err) reject(err);
resolve(entry);
});
});
}
export const handler = async event => {
/**
* concatCsvFiles returns a readable stream to pass to either the archiver or
* s3.upload.
*/
let bytesRead = 0;
try {
const stream = await concatCsvFiles(event.sourceFiles, {
omitHeader: event.omitHeader,
});
stream.on('data', chunk => {
bytesRead += chunk.length;
console.log('read', bytesRead, 'bytes so far');
});
stream.on('end', () => {
console.log('this is never called :(');
});
const dest = s3UrlToParts(event.destinationFile);
let archive;
if (event.compress) {
archive = new packer();
await addToZip(archive, stream, { name: 'concat.csv' });
archive.finalize();
}
console.log('uploading');
await s3
.upload({
Body: event.compress ? archive : stream,
Bucket: dest.bucket,
Key: dest.key,
})
.promise();
console.log('done uploading');
if (!event.preserveSourceFiles) {
const s3Objects = event.sourceFiles.map(s3Url => {
const { bucket, key } = s3UrlToParts(s3Url);
return {
bucket,
key,
};
});
await s3
.deleteObjects({
Bucket: s3Objects[0].bucket,
Delete: {
Objects: s3Objects.map(s3Obj => ({ Key: s3Obj.key })),
},
})
.promise();
}
console.log('## Never gets here');
// return {
// newFile: event.destinationFile,
// };
} catch (e) {
if (e.code) {
throw new Error(e.code);
}
throw e;
}
};
И это код конкатенации:
import MultiStream from 'multistream';
import { Readable } from 'stream';
import S3 from 'aws-sdk/clients/s3';
import { s3UrlToParts } from './utils';
const s3 = new S3({ apiVersion: '2006-03-01' });
/**
* Takes an array of S3 URLs and returns a readable stream of the concatenated results
* @param {string[]} s3Urls Array of S3 URLs
* @param {object} options Options
* @param {boolean} options.omitHeader Omit the header from the final output
*/
export async function concatCsvFiles(s3Urls, options = {}) {
// Get the header so we can use the length to set an offset in grabbing files
const firstFile = s3Urls[0];
const file = s3UrlToParts(firstFile);
const data = await s3
.getObject({
Bucket: file.bucket,
Key: file.key,
Range: 'bytes 0-512', // first 512 bytes is pretty safe for header size
})
.promise();
const streams = [];
const [header] = data.Body.toString().split('\n');
for (const s3Url of s3Urls) {
const { bucket, key } = s3UrlToParts(s3Url);
const stream = s3
.getObject({
Bucket: bucket,
Key: key,
Range: `bytes=${header.length + 1}-`, // +1 for newline char
})
.createReadStream();
streams.push(stream);
}
if (!options.omitHeader) {
const headerStream = new Readable();
headerStream.push(header + '\n');
headerStream.push(null);
streams.unshift(headerStream);
}
const combinedStream = new MultiStream(streams);
return combinedStream;
}
1 ответ
Понял. Проблема была на самом деле с zip-stream
библиотека. Очевидно, что это не работает с потоковой передачей S3 +. Я старался yazl
и это работает отлично.