2017-10-21 21:10:33 -04:00
"use strict" ;
2017-10-25 22:39:21 -04:00
const log = require ( './log' ) ;
const sql = require ( './sql' ) ;
2018-04-02 21:25:20 -04:00
const sqlInit = require ( './sql_init' ) ;
2018-04-01 21:27:46 -04:00
const optionService = require ( './options' ) ;
2017-10-26 21:16:21 -04:00
const utils = require ( './utils' ) ;
2018-04-01 21:27:46 -04:00
const sourceIdService = require ( './source_id' ) ;
2018-04-02 20:46:46 -04:00
const dateUtils = require ( './date_utils' ) ;
2018-04-01 21:27:46 -04:00
const syncUpdateService = require ( './sync_update' ) ;
const contentHashService = require ( './content_hash' ) ;
const appInfo = require ( './app_info' ) ;
2018-07-25 09:46:57 +02:00
const syncOptions = require ( './sync_options' ) ;
2018-04-01 21:27:46 -04:00
const syncMutexService = require ( './sync_mutex' ) ;
2018-03-28 23:41:22 -04:00
const cls = require ( './cls' ) ;
2018-12-17 21:34:02 +01:00
const request = require ( './request' ) ;
2019-10-27 22:13:38 +01:00
const ws = require ( './ws' ) ;
2020-08-02 23:27:48 +02:00
const entityChangesService = require ( './entity_changes.js' ) ;
2019-12-18 22:58:30 +01:00
const entityConstructor = require ( '../entities/entity_constructor' ) ;
2017-10-25 22:39:21 -04:00
2017-11-16 20:08:04 -05:00
let proxyToggle = true ;
2017-10-25 22:39:21 -04:00
2018-07-23 21:15:32 +02:00
const stats = {
outstandingPushes : 0 ,
outstandingPulls : 0
} ;
2017-11-09 20:52:47 -05:00
async function sync ( ) {
2017-10-29 14:55:48 -04:00
try {
2018-07-24 20:35:03 +02:00
return await syncMutexService . doExclusively ( async ( ) => {
2020-06-20 12:31:38 +02:00
if ( ! syncOptions . isSyncSetup ( ) ) {
2018-07-24 20:35:03 +02:00
return { success : false , message : 'Sync not configured' } ;
}
2019-12-18 22:58:30 +01:00
let continueSync = false ;
2017-11-09 20:52:47 -05:00
2019-12-18 22:58:30 +01:00
do {
const syncContext = await login ( ) ;
2017-11-09 20:52:47 -05:00
2019-12-18 22:58:30 +01:00
await pushSync ( syncContext ) ;
2017-11-09 20:52:47 -05:00
2019-12-18 22:58:30 +01:00
await pullSync ( syncContext ) ;
2017-11-09 20:52:47 -05:00
2019-12-18 22:58:30 +01:00
await pushSync ( syncContext ) ;
2018-07-24 20:35:03 +02:00
2019-12-18 22:58:30 +01:00
await syncFinished ( syncContext ) ;
continueSync = await checkContentHash ( syncContext ) ;
}
while ( continueSync ) ;
2017-11-21 22:11:27 -05:00
2018-07-24 20:35:03 +02:00
return {
success : true
} ;
} ) ;
2017-10-29 14:55:48 -04:00
}
catch ( e ) {
2017-11-16 20:08:04 -05:00
proxyToggle = ! proxyToggle ;
2019-04-13 10:13:47 +02:00
if ( e . message &&
( e . message . includes ( 'ECONNREFUSED' ) ||
e . message . includes ( 'ERR_CONNECTION_REFUSED' ) ||
e . message . includes ( 'Bad Gateway' ) ) ) {
2017-11-09 20:52:47 -05:00
log . info ( "No connection to sync server." ) ;
2017-10-26 20:31:31 -04:00
2017-11-09 20:52:47 -05:00
return {
success : false ,
message : "No connection to sync server."
} ;
2017-10-31 19:34:58 -04:00
}
2017-11-09 20:52:47 -05:00
else {
2020-06-13 10:23:36 +02:00
log . info ( "sync failed: " + e . message + "\nstack: " + e . stack ) ;
2017-11-09 20:52:47 -05:00
return {
success : false ,
message : e . message
}
2017-10-31 19:34:58 -04:00
}
2017-11-09 20:52:47 -05:00
}
}
async function login ( ) {
2018-09-10 20:22:26 +02:00
const setupService = require ( './setup' ) ; // circular dependency issue
2018-09-11 10:01:40 +02:00
if ( ! await setupService . hasSyncServerSchemaAndSeed ( ) ) {
await setupService . sendSeedToSyncServer ( ) ;
2018-09-10 20:22:26 +02:00
}
2020-07-01 21:33:52 +02:00
return await doLogin ( ) ;
2018-09-10 20:22:26 +02:00
}
async function doLogin ( ) {
2019-03-13 22:43:59 +01:00
const timestamp = dateUtils . utcNowDateTime ( ) ;
2017-11-09 20:52:47 -05:00
2020-06-20 12:31:38 +02:00
const documentSecret = optionService . getOption ( 'documentSecret' ) ;
2017-11-09 20:52:47 -05:00
const hash = utils . hmac ( documentSecret , timestamp ) ;
2018-12-16 21:19:12 +01:00
const syncContext = { cookieJar : { } } ;
2017-11-09 23:25:23 -05:00
const resp = await syncRequest ( syncContext , 'POST' , '/api/login/sync' , {
2017-11-09 20:52:47 -05:00
timestamp : timestamp ,
2018-06-10 15:55:29 -04:00
syncVersion : appInfo . syncVersion ,
2017-11-09 20:52:47 -05:00
hash : hash
} ) ;
2018-04-01 21:27:46 -04:00
if ( sourceIdService . isLocalSourceId ( resp . sourceId ) ) {
2020-07-02 22:57:17 +02:00
throw new Error ( ` Sync server has source ID ${ resp . sourceId } which is also local. Your sync setup is probably trying to connect to itself. ` ) ;
2017-12-22 06:48:24 -05:00
}
2017-11-09 20:52:47 -05:00
syncContext . sourceId = resp . sourceId ;
2020-06-20 12:31:38 +02:00
const lastSyncedPull = getLastSyncedPull ( ) ;
2019-01-04 18:58:46 +01:00
// this is important in a scenario where we setup the sync by manually copying the document
// lastSyncedPull then could be pretty off for the newly cloned client
2020-08-02 23:27:48 +02:00
if ( lastSyncedPull > resp . maxEntityChangeId ) {
log . info ( ` Lowering last synced pull from ${ lastSyncedPull } to ${ resp . maxEntityChangeId } ` ) ;
2019-01-04 18:58:46 +01:00
2020-08-02 23:27:48 +02:00
setLastSyncedPull ( resp . maxEntityChangeId ) ;
2019-01-04 18:58:46 +01:00
}
2017-11-09 20:52:47 -05:00
return syncContext ;
}
async function pullSync ( syncContext ) {
2020-07-31 00:08:01 +02:00
let atLeastOnePullApplied = false ;
2019-10-28 20:26:40 +01:00
2018-07-23 21:15:32 +02:00
while ( true ) {
2020-06-20 12:31:38 +02:00
const lastSyncedPull = getLastSyncedPull ( ) ;
2020-08-02 23:27:48 +02:00
const changesUri = '/api/sync/changed?lastEntityChangeId=' + lastSyncedPull ;
2017-11-09 20:52:47 -05:00
2020-04-04 14:57:19 +02:00
const startDate = Date . now ( ) ;
2018-07-25 22:54:37 +02:00
2018-07-23 21:15:32 +02:00
const resp = await syncRequest ( syncContext , 'GET' , changesUri ) ;
2020-07-28 23:29:12 +02:00
const pulledDate = Date . now ( ) ;
2020-08-02 23:27:48 +02:00
stats . outstandingPulls = resp . maxEntityChangeId - lastSyncedPull ;
2017-11-09 20:52:47 -05:00
2018-12-16 21:19:12 +01:00
if ( stats . outstandingPulls < 0 ) {
stats . outstandingPulls = 0 ;
}
2020-08-02 23:43:39 +02:00
const { entityChanges } = resp ;
2017-11-09 20:52:47 -05:00
2020-08-02 23:43:39 +02:00
if ( entityChanges . length === 0 ) {
2018-07-23 21:15:32 +02:00
break ;
2017-10-31 20:09:07 -04:00
}
2020-06-20 12:31:38 +02:00
sql . transactional ( ( ) => {
2020-08-02 23:43:39 +02:00
for ( const { entityChange , entity } of entityChanges ) {
if ( ! sourceIdService . isLocalSourceId ( entityChange . sourceId ) ) {
if ( ! atLeastOnePullApplied && entityChange . entity !== 'recent_notes' ) { // send only for first
2020-04-04 21:49:57 +02:00
ws . syncPullInProgress ( ) ;
2018-07-23 21:15:32 +02:00
2020-07-31 00:08:01 +02:00
atLeastOnePullApplied = true ;
2020-04-04 21:49:57 +02:00
}
2019-10-28 20:44:11 +01:00
2020-08-02 23:43:39 +02:00
syncUpdateService . updateEntity ( entityChange , entity , syncContext . sourceId ) ;
2019-10-28 20:44:11 +01:00
}
2020-08-02 23:43:39 +02:00
stats . outstandingPulls = resp . maxEntityChangeId - entityChange . id ;
2018-07-23 21:15:32 +02:00
}
2020-08-02 23:43:39 +02:00
setLastSyncedPull ( entityChanges [ entityChanges . length - 1 ] . entityChange . id ) ;
2020-04-04 21:49:57 +02:00
} ) ;
2018-07-25 22:54:37 +02:00
2020-08-02 23:43:39 +02:00
log . info ( ` Pulled ${ entityChanges . length } changes starting at entityChangeId= ${ lastSyncedPull } in ${ pulledDate - startDate } ms and applied them in ${ Date . now ( ) - pulledDate } ms, ${ stats . outstandingPulls } outstanding pulls ` ) ;
2017-10-26 20:31:31 -04:00
}
2017-10-31 19:34:58 -04:00
2020-07-31 00:08:01 +02:00
if ( atLeastOnePullApplied ) {
2019-10-28 20:26:40 +01:00
ws . syncPullFinished ( ) ;
}
2019-10-25 22:20:14 +02:00
2017-11-09 20:52:47 -05:00
log . info ( "Finished pull" ) ;
2017-10-26 20:31:31 -04:00
}
2017-10-25 22:39:21 -04:00
2017-11-09 20:52:47 -05:00
async function pushSync ( syncContext ) {
2020-06-20 12:31:38 +02:00
let lastSyncedPush = getLastSyncedPush ( ) ;
2017-10-25 22:39:21 -04:00
2017-11-09 20:52:47 -05:00
while ( true ) {
2020-08-02 23:43:39 +02:00
const entityChanges = sql . getRows ( 'SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000' , [ lastSyncedPush ] ) ;
2017-11-09 20:52:47 -05:00
2020-08-02 23:43:39 +02:00
if ( entityChanges . length === 0 ) {
2018-07-25 10:57:36 +02:00
log . info ( "Nothing to push" ) ;
break ;
}
2020-08-02 23:43:39 +02:00
const filteredEntityChanges = entityChanges . filter ( entityChange => {
if ( entityChange . sourceId === syncContext . sourceId ) {
2018-04-07 22:25:28 -04:00
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
// so this is applied to the database only if there's no actual update
// TODO: it would be better to simplify this somehow
2020-08-02 23:43:39 +02:00
lastSyncedPush = entityChange . id ;
2017-11-09 20:52:47 -05:00
2018-04-07 22:25:28 -04:00
return false ;
}
else {
return true ;
}
} ) ;
2017-10-29 22:22:30 -04:00
2020-08-02 23:43:39 +02:00
if ( filteredEntityChanges . length === 0 ) {
// there still might be more sync changes (because of batch limit), just all from current batch
2018-07-25 10:57:36 +02:00
// has been filtered out
2020-06-20 12:31:38 +02:00
setLastSyncedPush ( lastSyncedPush ) ;
2018-01-06 15:56:00 -05:00
2018-07-25 10:57:36 +02:00
continue ;
2018-01-06 15:56:00 -05:00
}
2017-11-01 23:16:21 -04:00
2020-08-02 23:43:39 +02:00
const entityChangesRecords = getEntityChangesRecords ( filteredEntityChanges ) ;
2018-07-25 22:54:37 +02:00
const startDate = new Date ( ) ;
2018-04-08 10:09:33 -04:00
2018-04-07 22:25:28 -04:00
await syncRequest ( syncContext , 'PUT' , '/api/sync/update' , {
sourceId : sourceIdService . getCurrentSourceId ( ) ,
2020-08-02 23:43:39 +02:00
entities : entityChangesRecords
2018-04-07 22:25:28 -04:00
} ) ;
2017-11-01 23:16:21 -04:00
2020-08-02 23:43:39 +02:00
log . info ( ` Pushing ${ entityChangesRecords . length } sync changes in ` + ( Date . now ( ) - startDate . getTime ( ) ) + "ms" ) ;
2018-07-25 22:54:37 +02:00
2020-08-02 23:43:39 +02:00
lastSyncedPush = entityChangesRecords [ entityChangesRecords . length - 1 ] . entityChange . id ;
2017-10-29 22:22:30 -04:00
2020-06-20 12:31:38 +02:00
setLastSyncedPush ( lastSyncedPush ) ;
2018-04-07 22:25:28 -04:00
}
2017-11-09 20:52:47 -05:00
}
2017-10-28 22:17:00 -04:00
2018-07-24 20:35:03 +02:00
async function syncFinished ( syncContext ) {
await syncRequest ( syncContext , 'POST' , '/api/sync/finished' ) ;
}
2017-11-21 22:11:27 -05:00
async function checkContentHash ( syncContext ) {
2017-12-16 12:30:37 -05:00
const resp = await syncRequest ( syncContext , 'GET' , '/api/sync/check' ) ;
2020-06-20 12:31:38 +02:00
const lastSyncedPullId = getLastSyncedPull ( ) ;
2017-11-21 22:11:27 -05:00
2020-08-02 23:27:48 +02:00
if ( lastSyncedPullId < resp . maxEntityChangeId ) {
log . info ( ` There are some outstanding pulls ( ${ lastSyncedPullId } vs. ${ resp . maxEntityChangeId } ), skipping content check. ` ) ;
2017-11-21 22:11:27 -05:00
2019-12-18 22:58:30 +01:00
return true ;
2017-11-21 22:11:27 -05:00
}
2020-08-02 23:27:48 +02:00
const notPushedSyncs = sql . getValue ( "SELECT EXISTS(SELECT 1 FROM entity_changes WHERE isSynced = 1 AND id > ?)" , [ getLastSyncedPush ( ) ] ) ;
2017-11-21 22:11:27 -05:00
2019-12-01 15:01:09 +01:00
if ( notPushedSyncs ) {
2018-04-08 10:09:33 -04:00
log . info ( ` There's ${ notPushedSyncs } outstanding pushes, skipping content check. ` ) ;
2017-11-21 22:11:27 -05:00
2019-12-18 22:58:30 +01:00
return true ;
}
2020-06-20 12:31:38 +02:00
const failedChecks = contentHashService . checkContentHashes ( resp . entityHashes ) ;
2019-12-18 22:58:30 +01:00
for ( const { entityName , sector } of failedChecks ) {
const entityPrimaryKey = entityConstructor . getEntityFromEntityName ( entityName ) . primaryKeyName ;
2020-08-02 23:27:48 +02:00
entityChangesService . addEntityChangesForSector ( entityName , entityPrimaryKey , sector ) ;
2019-12-18 22:58:30 +01:00
await syncRequest ( syncContext , 'POST' , ` /api/sync/queue-sector/ ${ entityName } / ${ sector } ` ) ;
2017-12-15 21:14:10 -05:00
}
2019-12-18 22:58:30 +01:00
return failedChecks . length > 0 ;
2017-11-21 22:11:27 -05:00
}
2020-07-01 21:33:52 +02:00
async function syncRequest ( syncContext , method , requestPath , body ) {
2020-06-20 12:31:38 +02:00
const timeout = syncOptions . getSyncTimeout ( ) ;
2020-06-13 10:23:36 +02:00
const opts = {
2018-12-17 21:34:02 +01:00
method ,
2020-06-20 12:31:38 +02:00
url : syncOptions . getSyncServerHost ( ) + requestPath ,
2018-12-17 21:34:02 +01:00
cookieJar : syncContext . cookieJar ,
2020-06-13 10:23:36 +02:00
timeout : timeout ,
2018-12-17 22:54:54 +01:00
body ,
2020-06-20 12:31:38 +02:00
proxy : proxyToggle ? syncOptions . getSyncProxy ( ) : null
2020-06-13 10:23:36 +02:00
} ;
2020-07-01 21:33:52 +02:00
return await utils . timeLimit ( request . exec ( opts ) , timeout ) ;
2017-11-05 00:16:02 -04:00
}
2018-04-07 22:25:28 -04:00
const primaryKeys = {
"notes" : "noteId" ,
2019-03-26 22:24:04 +01:00
"note_contents" : "noteId" ,
2018-04-07 22:25:28 -04:00
"branches" : "branchId" ,
"note_revisions" : "noteRevisionId" ,
2019-11-09 16:51:51 +01:00
"note_revision_contents" : "noteRevisionId" ,
2019-05-21 21:47:28 +02:00
"recent_notes" : "noteId" ,
2018-05-21 20:12:46 -04:00
"api_tokens" : "apiTokenId" ,
2018-08-14 17:32:15 +02:00
"options" : "name" ,
2019-08-19 20:12:00 +02:00
"attributes" : "attributeId"
2018-04-07 22:25:28 -04:00
} ;
2020-08-02 23:27:48 +02:00
function getEntityChangeRow ( entityName , entityId ) {
2018-04-07 22:25:28 -04:00
if ( entityName === 'note_reordering' ) {
2020-06-20 12:31:38 +02:00
return sql . getMap ( "SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0" , [ entityId ] ) ;
2018-04-07 22:25:28 -04:00
}
else {
const primaryKey = primaryKeys [ entityName ] ;
if ( ! primaryKey ) {
throw new Error ( "Unknown entity " + entityName ) ;
}
2020-06-20 12:31:38 +02:00
const entity = sql . getRow ( ` SELECT * FROM ${ entityName } WHERE ${ primaryKey } = ? ` , [ entityId ] ) ;
2018-04-07 22:25:28 -04:00
2019-03-27 21:04:25 +01:00
if ( ! entity ) {
2019-04-13 10:13:47 +02:00
throw new Error ( ` Entity ${ entityName } ${ entityId } not found. ` ) ;
2019-03-27 21:04:25 +01:00
}
2019-11-09 16:51:51 +01:00
if ( [ 'note_contents' , 'note_revision_contents' ] . includes ( entityName ) && entity . content !== null ) {
2019-02-15 00:15:09 +01:00
if ( typeof entity . content === 'string' ) {
entity . content = Buffer . from ( entity . content , 'UTF-8' ) ;
}
2018-12-22 09:54:09 +01:00
2019-01-03 22:13:58 +01:00
entity . content = entity . content . toString ( "base64" ) ;
2018-04-07 22:25:28 -04:00
}
2018-04-07 22:59:47 -04:00
return entity ;
2018-04-07 22:25:28 -04:00
}
}
2020-08-02 23:27:48 +02:00
function getEntityChangesRecords ( entityChanges ) {
2018-04-07 22:25:28 -04:00
const records = [ ] ;
let length = 0 ;
2020-08-02 23:27:48 +02:00
for ( const entityChange of entityChanges ) {
const entity = getEntityChangeRow ( entityChange . entityName , entityChange . entityId ) ;
2020-02-05 22:08:45 +01:00
2020-08-02 23:27:48 +02:00
if ( entityChange . entityName === 'options' && ! entity . isSynced ) {
records . push ( { entityChange } ) ;
2020-02-19 22:09:02 +01:00
2020-02-05 22:08:45 +01:00
continue ;
}
2020-08-02 23:27:48 +02:00
const record = { entityChange , entity } ;
2018-04-07 22:25:28 -04:00
records . push ( record ) ;
length += JSON . stringify ( record ) . length ;
if ( length > 1000000 ) {
break ;
}
}
return records ;
}
2020-06-20 12:31:38 +02:00
function getLastSyncedPull ( ) {
return parseInt ( optionService . getOption ( 'lastSyncedPull' ) ) ;
2018-04-08 10:09:33 -04:00
}
2020-08-02 23:27:48 +02:00
function setLastSyncedPull ( entityChangeId ) {
optionService . setOption ( 'lastSyncedPull' , entityChangeId ) ;
2018-04-08 10:09:33 -04:00
}
2020-06-20 12:31:38 +02:00
function getLastSyncedPush ( ) {
return parseInt ( optionService . getOption ( 'lastSyncedPush' ) ) ;
2018-04-08 10:09:33 -04:00
}
2020-08-27 22:03:56 +02:00
function setLastSyncedPush ( entityChangeId ) {
optionService . setOption ( 'lastSyncedPush' , entityChangeId ) ;
2018-04-08 10:09:33 -04:00
}
2020-06-20 12:31:38 +02:00
function updatePushStats ( ) {
if ( syncOptions . isSyncSetup ( ) ) {
const lastSyncedPush = optionService . getOption ( 'lastSyncedPush' ) ;
2018-07-24 21:43:15 +02:00
2020-08-02 23:27:48 +02:00
stats . outstandingPushes = sql . getValue ( "SELECT COUNT(1) FROM entity_changes WHERE isSynced = 1 AND id > ?" , [ lastSyncedPush ] ) ;
2018-07-30 16:45:34 +02:00
}
2018-07-24 21:43:15 +02:00
}
2020-08-02 23:27:48 +02:00
function getMaxEntityChangeId ( ) {
return sql . getValue ( 'SELECT COALESCE(MAX(id), 0) FROM entity_changes' ) ;
2019-10-20 17:49:58 +02:00
}
2020-06-20 21:42:41 +02:00
sqlInit . dbReady . then ( ( ) => {
setInterval ( cls . wrap ( sync ) , 60000 ) ;
2017-11-14 21:54:12 -05:00
2020-06-20 21:42:41 +02:00
// kickoff initial sync immediately
setTimeout ( cls . wrap ( sync ) , 3000 ) ;
2017-12-01 20:39:48 -05:00
2020-06-20 21:42:41 +02:00
setInterval ( cls . wrap ( updatePushStats ) , 1000 ) ;
} ) ;
2017-10-26 21:16:21 -04:00
module . exports = {
2018-02-18 22:55:36 -05:00
sync ,
2018-07-23 10:29:17 +02:00
login ,
2020-08-02 23:27:48 +02:00
getEntityChangesRecords ,
2019-10-20 17:49:58 +02:00
stats ,
2020-08-02 23:27:48 +02:00
getMaxEntityChangeId
2020-06-13 10:23:36 +02:00
} ;