Пакетное обновление в knex
Я хочу выполнить пакетное обновление с использованием Knex.js
Например:
'UPDATE foo SET [theValues] WHERE idFoo = 1'
'UPDATE foo SET [theValues] WHERE idFoo = 2'
со значениями:
{ name: "FooName1", checked: true } // to `idFoo = 1`
{ name: "FooName2", checked: false } // to `idFoo = 2`
Ранее я использовал node-mysql, что позволило использовать несколько операторов. Используя это, я просто создал строку запроса из нескольких операторов и просто отправил ее по проводам за один прогон.
Я не уверен, как добиться того же с Knex. я могу видеть batchInsert
в качестве метода API я могу использовать, но ничего, насколько batchUpdate
обеспокоен.
Замечания:
Я могу сделать асинхронную итерацию и обновить каждую строку отдельно. Это плохо, потому что это означает, что от сервера до БД будет много обращений
Я могу использовать
raw()
вещь Knex и, вероятно, сделать что-то похожее на то, что я делаю с node-mysql. Тем не менее, это побеждает всю цель knex- быть уровнем абстракции БД (вводит сильную связь с БД)
Поэтому я хотел бы сделать это, используя что-то "knex-y".
Любые идеи приветствуются.
5 ответов
У вас есть хорошее представление о плюсах и минусах каждого подхода. Я бы порекомендовал необработанный запрос, который массово обновляется через несколько асинхронных обновлений Да, вы можете запускать их параллельно, но вашим узким местом становится время, необходимое БД для запуска каждого обновления. Подробности можно найти здесь.
Ниже приведен пример загрузки пакета с использованием knex.raw. Предположим, что записи - это массив объектов (один объект для каждой строки, которую мы хотим обновить), значениями которых являются имена свойств, совпадающие со столбцами в базе данных, которую вы хотите обновить:
var knex = require('knex'),
_ = require('underscore');
function bulkUpdate (records) {
var updateQuery = [
'INSERT INTO mytable (primaryKeyCol, col2, colN) VALUES',
_.map(records, () => '(?)').join(','),
'ON DUPLICATE KEY UPDATE',
'col2 = VALUES(col2),',
'colN = VALUES(colN)'
].join(' '),
vals = [];
_(records).map(record => {
vals.push(_(record).values());
});
return knex.raw(updateQuery, vals);
}
Этот ответ отлично справляется с объяснением отношений между двумя подходами во время выполнения.
Редактировать:
Было предложено, чтобы я показал, что records
будет выглядеть как в этом примере.
var records = [
{ primaryKeyCol: 123, col2: 'foo', colN: 'bar' },
{ // some other record, same props }
];
Обратите внимание, что если ваш record
имеет дополнительные свойства, чем те, которые вы указали в запросе, вы не можете сделать:
_(records).map(record => {
vals.push(_(record).values());
});
Поскольку вы передадите слишком много значений запросу на запись, а knex не сможет сопоставить значения свойств каждой записи с ?
символы в запросе. Вместо этого вам нужно будет явно помещать значения в каждую запись, которую вы хотите вставить в массив следующим образом:
// assume a record has additional property `type` that you dont want to
// insert into the database
// example: { primaryKeyCol: 123, col2: 'foo', colN: 'bar', type: 'baz' }
_(records).map(record => {
vals.push(record.primaryKeyCol);
vals.push(record.col2);
vals.push(record.colN);
});
Есть менее повторяющиеся способы сделать вышеупомянутые явные ссылки, но это только пример. Надеюсь это поможет!
Мне нужно было выполнить пакетное обновление внутри транзакции (я не хотел получать частичные обновления в случае, если что-то пошло не так). Я решил это следующим образом:
// I wrap knex as 'connection'
return connection.transaction(trx => {
const queries = [];
users.forEach(user => {
const query = connection('users')
.where('id', user.id)
.update({
lastActivity: user.lastActivity,
points: user.points,
})
.transacting(trx); // This makes every update be in the same transaction
queries.push(query);
});
Promise.all(queries) // Once every query is written
.then(trx.commit) // We try to execute all of them
.catch(trx.rollback); // And rollback in case any of them goes wrong
});
Предполагая, что у вас есть коллекция допустимых ключей / значений для данной таблицы:
// abstract transactional batch update
function batchUpdate(table, collection) {
return knex.transaction(trx => {
let queries = collection.map(tuple =>
knex(table)
.where('id', tuple.id)
.update(tuple)
.transacting(trx)
);
return Promise.all(queries)
.then(trx.commit)
.catch(trx.rollback);
});
}
Называть это
batchUpdate('user', [...]);
Вы, к сожалению, подвержены нетрадиционным именам столбцов? Не беспокойся, я понял тебя
function batchUpdate(options, collection) {
return knex.transaction((trx) => {
let queries = collection.map(tuple =>
knex(options.table)
.where(options.column, tuple[options.column])
.update(tuple)
.transacting(trx)
);
return Promise.all(queries)
.then(trx.commit)
.catch(trx.rollback);
});
}
Называть это
batchUpdate({ table: 'user', column: 'user_id' }, [...]);
Обновление можно выполнять партиями, т. е. 1000 строк в партии.
И пока он делает это партиями, можно использовать карту bluebird .
Для получения дополнительной информации о карте Bluebird: http://bluebirdjs.com/docs/api/promise.map.html
const limit = 1000;
const totalRows = 50000;
const seq = count => Array(Math.ceil(count / limit)).keys();
map(seq(totalRows), page => updateTable(dbTable, page), { concurrency: 1 });
const updateTable = async (dbTable, page) => {
let offset = limit* page;
return knex(dbTable).pluck('id').limit(limit).offset(offset).then(ids => {
return knex(dbTable)
.whereIn('id', ids)
.update({ date: new Date() })
.then((rows) => {
console.log(`${page} - Updated rows of the table ${dbTable} from ${offset} to ${offset + batch}: `, rows);
})
.catch((err) => {
console.log({ err });
});
})
.catch((err) => {
console.log({ err });
});
};
Где pluck() используется для получения идентификаторов в виде массива
Решение отлично работает для меня! Я просто включаю параметр ID, чтобы сделать его динамическим в таблицах с пользовательскими тегами ID. Ченхай, вот мой фрагмент, включающий способ вернуть один массив значений идентификаторов для транзакции:
function batchUpdate(table, id, collection) {
return knex.transaction((trx) => {
const queries = collection.map(async (tuple) => {
const [tupleId] = await knex(table)
.where(`${id}`, tuple[id])
.update(tuple)
.transacting(trx)
.returning(id);
return tupleId;
});
return Promise.all(queries).then(trx.commit).catch(trx.rollback);
});
}
Вы можете использовать
response = await batchUpdate("table_name", "custom_table_id", [array of rows to update])
чтобы получить возвращенный массив идентификаторов.