Выход из цикла с транзакциями ACID с помощью Knex SQL Query Builder и Bluebird в Node.js

Мы используем Knex SQL Query Builder для выполнения ACID-транзакции в Node и испытываем странное поведение при использовании цикла с Knex. Код ниже принимает массив таблиц и затем условно выполняет вставку или обновление. Первая таблица - "actionHeader "и обрабатывается первой. Затем строки в таблице транзакции обрабатываются в рамках всей транзакции. Новые ключи (rowids) накапливаются в массиве 'rowids'.

ПРОБЛЕМА: Основная проблема заключается в том, что кажется невозможным выйти из цикла в processTransactionDetail(), если из Knex возвращается ошибка. Ни throw, ни return не будут выходить из цикла или функции. Это означает, что если при обработке транзакции Deetail возникнет ошибка, он продолжит обработку оставшихся строк перед выходом.

    let rowids: any[] = [];

    knex.transaction(function(trx) {

        // Process transactionHeader
        if (transactionHeader.rowid) {

            // Update transactionHeader
            trx('transaction')
                .transacting(trx)
                .update(transactionHeader)
                .where('rowid', transactionHeader.rowid)
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionHeader.rowid });
                    // Update transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);

        } else {

            // Insert transactionHeader
            trx('transaction')
                .transacting(trx)
                .insert(transactionHeader, 'rowid')
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionrowid });
                    // Insert transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);
        }

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
        return;
    }).catch(function(error) {
        console.error('error', error);
        callback(null, {
            success: false,
            message: error.message
        }, { data: rowids })
        return;
    });

    /*
    * Process transactionDetail rows.
    */
    function processTransactionDetail(transactionHeaderRowID: number, trx) {

        var promise: any;
        let table: TABLE;
        let rowid: number;

        for (let i = 1; i < tablesToProcess.length; i++) {

            table = tablesToProcess[i];
            rowid = table.data[0].rowid;

            // Update
            if (rowid) {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .update(table.data[row])
                        .where('rowid', rowid)
                        .then(function(rowid) {                                
                            rowids.push({ table: table.name, rowid: rowid });
                        .catch(function(error) {                                
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        })
                 } 

            // Insert
            } else {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .insert(table.data[row])
                        .then(function(rowid) {
                            rowids.push({ table: table.name, rowid: rowid });
                        })
                        .catch(function(error) {
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        });
                }
            }
        }

        promise.then(function(x) {
            promise.then(trx.commit);
        });
    }

ОБНОВЛЕНО: это правильная структура? Не уверен, что обработчики ошибок внизу действительно нужны.

    knex.transaction(function(trx) {

        // Update Row
        function updateRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .update(row)
                .where('rowid', rowid)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Insert Row
        function insertRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .insert(row)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Process Tables
        Promise.mapSeries(tablesToProcess, function(table) {

            let rowid = table.data[0].rowid;

            // choose the right function to apply to the rows
            var fn = rowid ? updateRow : insertRow;

            // fn needs table and rowid
            fn = fn.bind(this, table, rowid);

            // Process Rows
            return Promise.mapSeries(table.data, fn)
                .then(function(result) {

                    // result is an array with all the knex promises result
                    return result;

                }).catch(function(err) {
                    console.log('an error happened');
                    //trx.rollback();  // QUESTION: IS THIS NEEDED?
                    throw err;     // IS THERE A WAY TO
                });

        }).then(function(result) {
            console.log('success', result);
            trx.commit();
            // callback(null, { success: true }, { data: rowids })
            // return;
        }).catch(function(error) {
            console.log('error', error);
            trx.rollback();
            callback(null, { success: false, message: error.message }, { data: rowids })
        });

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
    }).catch(function(error) {
        console.log('error', error);
        callback(null, { success: false, message: error.message }, { data: rowids })
    });

1 ответ

Решение

Вы имеете дело с обещаниями, поэтому вы должны использовать способ их поддержки. Есть блюбердmapSeries() например:

var Promise = require('bluebird');

function updateRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .update(table.data[row])
    .where('rowid', rowid)
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

function insertRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .insert(table.data[row])
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

// if there is an error, the iteration will stop immediately
Promise.mapSeries(tablesToProcess, function(table) {
  rowid = table.data[0].rowid;

  // choose the right function to apply to the rows
  var fn = rowid ? updateRow : insertRow;
  // fn need table and rowid
  fn = fn.bind(this, table, rowid);

  // call fn for each row
  // if there is an error, the iteration will stop immediately
  return Promise.mapSeries(table.data, fn)
    .then(function(result) {
      // result is an array with all the knex promises result
      return result;
    }).catch(function(err) {
      console.log('an error happened');
      trx.rollback();
      throw err;
    });
}).then(function(result) {
  console.log('all is good');
  // you can safely commit here
  trx.commit();
}).catch(function(err) {
  console.log('an error happened');
  trx.rollback();
});

Обновить

О ваших вопросах:

knex.transaction(function (trx) {
    // Update Row
    function updateRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .update(row)
            .where('rowid', rowid)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // Insert Row
    function insertRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .insert(row)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // you need to return here so the 'then(function (inserts)' and the catch work
    // Process Tables
    return Promise.mapSeries(tablesToProcess, function (table) {
        let rowid = table.data[0].rowid;

        // choose the right function to apply to the rows
        var fn = rowid ? updateRow : insertRow;

        // fn needs table and rowid
        fn = fn.bind(this, table, rowid);

        // Process Rows
        // if you don't do anything special in the then and the catch, you can remove them
        return Promise.mapSeries(table.data, fn)
            .then(function (result) {
                // result is an array with all the knex promises result
                return result;
            }).catch(function (err) {
                // this catch is not necessary,
                // you can remove it you don't need to do something here
                console.log('an error happened');
                //trx.rollback();  // QUESTION: IS THIS NEEDED? << // no, my mistake, the rollback is done on the other catch
                throw err;     // IS THERE A WAY TO
            });
    }).then(function (result) {
        console.log('success', result);
        trx.commit();
        return result;
        // callback(null, { success: true }, { data: rowids })
        // return;
    }).catch(function (error) {
        console.log('error', error);
        trx.rollback();
        throw error; // always rethrow error when you chain, if you don't, it's like the promise is resolved (ok) 
        // you already do this below
        // callback(null, { success: false, message: error.message }, { data: rowids });
    });
}).then(function (inserts) {
    console.log('success!', rowids)
    callback(null, { success: true }, { data: rowids })
}).catch(function (error) {
    console.log('error', error);
    callback(null, { success: false, message: error.message }, { data: rowids })
});

2 то и 2 улов внизу могут быть объединены. Кроме того, почему есть обратный вызов? Лучше не смешивать обещания и обратные вызовы, если вы не можете избежать этого.

Другие вопросы по тегам