Как вы оборачиваете записи sequelize, которые происходят в STREAM node.js в транзакции?
Я проверяю файлы CSV и затем записываю их в базу данных. Текущая настройка - File.pipe(validationStream).pipe(DBWriteStream); Как можно обернуть все записи в моем DBWriteStream в транзакцию, которую можно отменить, если в validationStream обнаружены ошибки? Если ошибки найдены, я хочу отменить весь файл.
Node.js 7, как использовать sequelize транзакции с async / await?
Приведенный выше пример, а также приведенный в документации, зависят от цепочек обещаний и асинхронных функций, однако мой поток является функцией записи потока преобразования и не использует обещание.
async function getWriteStream(params) {
const rowLimit = 2000;
let rowsToWrite = [];
const writeStream = through2.obj(write, end);
// I'd like to rollback this report creation
const result = await createReportId(params);
const reportId = result.id;
// buffer = {isValid: bool, data: rowData}
// if I receive an "isValid: false" from the validation stream,
// i'd like to stop writing and rollback everything
function write(buffer, encoding, next) {
let rollBack = false;
if (buffer.isValid && !rollBack) {
buffer.data.forEach(e => {
e.some_report_id = reportId;
pushRows(e);
});
} else if (!rollBack) {
rollBack = true;
// rollback transaction?
}
next();
}
function end(done) {
writeRows();
done();
}
async function pushRows(row) {
rowsToWrite.push(row);
if (rowsToWrite.length >= rowLimit) {
await writeRows();
}
}
// want to rollback all of these writes
async function writeRows() {
if (rowsToWrite.length > 0) {
const toSave = rowsToWrite.slice(0);
rowsToWrite = [];
await Some.bulkCreate(toSave);
}
}
return writeStream;
}
Моя проблема в том, что я не хочу выдавать ошибку и останавливать поток проверки, потому что, если в CSV есть несколько ошибок, я хотел бы собрать всю эту информацию в первый раз и отправить ее пользователю.
Кроме того, строки на самом деле записываются в функции "запись" потока преобразования, которая не является асинхронной функцией, поэтому я не могу ее ждать.