Notes/src/services/sql.js

290 lines
7.3 KiB
JavaScript
Raw Normal View History

2017-10-21 21:10:33 -04:00
"use strict";
2017-10-24 22:17:48 -04:00
const log = require('./log');
const cls = require('./cls');
let dbConnection;
function setDbConnection(connection) {
dbConnection = connection;
2017-12-03 22:29:23 -05:00
}
[`exit`, `SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach(eventType => {
process.on(eventType, () => {
if (dbConnection) {
// closing connection is especially important to fold -wal file into the main DB file
// (see https://sqlite.org/tempfiles.html for details)
dbConnection.close();
}
});
});
async function insert(tableName, rec, replace = false) {
2017-10-25 22:39:21 -04:00
const keys = Object.keys(rec);
if (keys.length === 0) {
log.error("Can't insert empty object into table " + tableName);
2017-10-25 22:39:21 -04:00
return;
}
const columns = keys.join(", ");
const questionMarks = keys.map(p => "?").join(", ");
const query = "INSERT " + (replace ? "OR REPLACE" : "") + " INTO " + tableName + "(" + columns + ") VALUES (" + questionMarks + ")";
const res = await execute(query, Object.values(rec));
return res.lastID;
}
async function replace(tableName, rec) {
return await insert(tableName, rec, true);
}
async function upsert(tableName, primaryKey, rec) {
const keys = Object.keys(rec);
if (keys.length === 0) {
log.error("Can't upsert empty object into table " + tableName);
return;
}
const columns = keys.join(", ");
let i = 0;
const questionMarks = keys.map(p => ":" + i++).join(", ");
i = 0;
const updateMarks = keys.map(key => `${key} = :${i++}`).join(", ");
const query = `INSERT INTO ${tableName} (${columns}) VALUES (${questionMarks})
ON CONFLICT (${primaryKey}) DO UPDATE SET ${updateMarks}`;
await execute(query, Object.values(rec));
2017-10-29 22:22:30 -04:00
}
async function beginTransaction() {
return await dbConnection.run("BEGIN");
}
async function commit() {
return await dbConnection.run("COMMIT");
}
async function rollback() {
return await dbConnection.run("ROLLBACK");
2017-10-25 22:39:21 -04:00
}
async function getRow(query, params = []) {
return await wrap(async db => db.get(query, ...params), query);
}
async function getRowOrNull(query, params = []) {
2018-04-02 21:34:28 -04:00
const all = await getRows(query, params);
2017-10-29 22:22:30 -04:00
return all.length > 0 ? all[0] : null;
}
async function getValue(query, params = []) {
const row = await getRowOrNull(query, params);
2017-10-29 22:22:30 -04:00
if (!row) {
return null;
}
return row[Object.keys(row)[0]];
}
const PARAM_LIMIT = 900; // actual limit is 999
// this is to overcome 999 limit of number of query parameters
async function getManyRows(query, params) {
let results = [];
while (params.length > 0) {
const curParams = params.slice(0, Math.min(params.length, PARAM_LIMIT));
params = params.slice(curParams.length);
let i = 1;
const questionMarks = curParams.map(() => "?" + i++).join(",");
const curQuery = query.replace(/\?\?\?/g, questionMarks);
results = results.concat(await getRows(curQuery, curParams));
}
return results;
}
async function getRows(query, params = []) {
return await wrap(async db => db.all(query, ...params), query);
}
2017-11-02 22:55:22 -04:00
async function getMap(query, params = []) {
const map = {};
const results = await getRows(query, params);
2017-11-02 22:55:22 -04:00
for (const row of results) {
const keys = Object.keys(row);
map[row[keys[0]]] = row[keys[1]];
}
return map;
}
async function getColumn(query, params = []) {
2017-10-24 23:14:26 -04:00
const list = [];
const result = await getRows(query, params);
2017-10-24 23:14:26 -04:00
2017-12-14 22:16:26 -05:00
if (result.length === 0) {
return list;
}
const key = Object.keys(result[0])[0];
2017-10-24 23:14:26 -04:00
for (const row of result) {
list.push(row[key]);
}
return list;
}
async function execute(query, params = []) {
await startTransactionIfNecessary();
return await wrap(async db => db.run(query, ...params), query);
}
2020-06-14 00:35:53 +02:00
async function executeWithoutTransaction(query, params = []) {
await dbConnection.run(query, ...params);
}
2019-11-01 22:09:51 +01:00
async function executeMany(query, params) {
await startTransactionIfNecessary();
2019-11-01 22:09:51 +01:00
// essentially just alias
await getManyRows(query, params);
}
async function executeScript(query) {
await startTransactionIfNecessary();
return await wrap(async db => db.exec(query), query);
2017-10-15 17:31:49 -04:00
}
async function wrap(func, query) {
if (!dbConnection) {
throw new Error("DB connection not initialized yet");
}
2017-11-21 22:11:27 -05:00
const thisError = new Error();
try {
const startTimestamp = Date.now();
const result = await func(dbConnection);
const milliseconds = Date.now() - startTimestamp;
if (milliseconds >= 300) {
if (query.includes("WITH RECURSIVE")) {
log.info(`Slow recursive query took ${milliseconds}ms.`);
}
else {
log.info(`Slow query took ${milliseconds}ms: ${query}`);
}
}
return result;
}
catch (e) {
log.error("Error executing query. Inner exception: " + e.stack + thisError.stack);
thisError.message = e.stack;
throw thisError;
}
}
// true if transaction is active globally.
// cls.namespace.get('isTransactional') OTOH indicates active transaction in active CLS
2017-11-28 18:33:23 -05:00
let transactionActive = false;
// resolves when current transaction ends with either COMMIT or ROLLBACK
let transactionPromise = null;
let transactionPromiseResolve = null;
async function startTransactionIfNecessary() {
if (!cls.namespace.get('isTransactional')
|| cls.namespace.get('isInTransaction')) {
return;
}
2017-11-28 18:33:23 -05:00
while (transactionActive) {
await transactionPromise;
}
2020-06-14 00:35:53 +02:00
// first set semaphore (atomic operation and only then start transaction
2017-11-28 18:33:23 -05:00
transactionActive = true;
transactionPromise = new Promise(res => transactionPromiseResolve = res);
2020-06-14 00:35:53 +02:00
cls.namespace.set('isInTransaction', true);
await beginTransaction();
}
async function transactional(func) {
// if the CLS is already transactional then the whole transaction is handled by higher level transactional() call
if (cls.namespace.get('isTransactional')) {
return await func();
}
2020-06-14 00:35:53 +02:00
cls.namespace.set('isTransactional', true); // this signals that transaction will be needed if there's a write operation
try {
const ret = await func();
if (cls.namespace.get('isInTransaction')) {
await commit();
// note that sync rows sent from this action will be sent again by scheduled periodic ping
2020-01-31 22:32:24 +01:00
require('./ws.js').sendPingToAllClients();
}
return ret;
}
catch (e) {
2020-06-14 00:35:53 +02:00
if (cls.namespace.get('isInTransaction')) {
await rollback();
2020-06-14 00:35:53 +02:00
}
throw e;
}
finally {
cls.namespace.set('isTransactional', false);
2020-06-14 00:35:53 +02:00
if (cls.namespace.get('isInTransaction')) {
transactionActive = false;
cls.namespace.set('isInTransaction', false);
2020-06-14 00:35:53 +02:00
// resolving even for rollback since this is just semaphore for allowing another write transaction to proceed
transactionPromiseResolve();
}
2017-11-28 18:33:23 -05:00
}
}
module.exports = {
setDbConnection,
insert,
2017-10-29 22:22:30 -04:00
replace,
getValue,
getRow,
getRowOrNull,
getRows,
getManyRows,
2017-11-02 22:55:22 -04:00
getMap,
getColumn,
execute,
2020-06-14 00:35:53 +02:00
executeWithoutTransaction,
2019-11-01 22:09:51 +01:00
executeMany,
2017-10-15 17:31:49 -04:00
executeScript,
transactional,
upsert
};