187 lines
5.3 KiB
JavaScript
Raw Normal View History

import utils from './utils.js';
2019-10-20 10:00:18 +02:00
import toastService from "./toast.js";
const $outstandingSyncsCount = $("#outstanding-syncs-count");
const allSyncMessageHandlers = [];
const outsideSyncMessageHandlers = [];
const messageHandlers = [];
let ws;
let lastSyncId = window.glob.maxSyncIdAtLoad;
let lastPingTs;
let syncDataQueue = [];
function logError(message) {
console.log(utils.now(), message); // needs to be separate from .trace()
console.trace();
if (ws && ws.readyState === 1) {
ws.send(JSON.stringify({
type: 'log-error',
error: message
}));
}
}
2017-12-17 13:46:18 -05:00
function subscribeToMessages(messageHandler) {
messageHandlers.push(messageHandler);
}
function subscribeToOutsideSyncMessages(messageHandler) {
outsideSyncMessageHandlers.push(messageHandler);
}
function subscribeToAllSyncMessages(messageHandler) {
allSyncMessageHandlers.push(messageHandler);
}
// used to serialize sync operations
let consumeQueuePromise = null;
async function handleMessage(event) {
const message = JSON.parse(event.data);
for (const messageHandler of messageHandlers) {
messageHandler(message);
}
if (message.type === 'sync') {
2019-10-28 19:45:36 +01:00
const syncRows = message.data;
2019-02-10 16:36:25 +01:00
lastPingTs = Date.now();
$outstandingSyncsCount.html(message.outstandingSyncs);
2019-10-28 19:45:36 +01:00
if (syncRows.length > 0) {
console.debug(utils.now(), "Sync data: ", syncRows);
2019-10-28 19:45:36 +01:00
syncDataQueue.push(...syncRows);
// first wait for all the preceding consumers to finish
while (consumeQueuePromise) {
await consumeQueuePromise;
}
// it's my turn so start it up
consumeQueuePromise = consumeSyncData();
await consumeQueuePromise;
// finish and set to null to signal somebody else can pick it up
consumeQueuePromise = null;
}
2019-10-28 19:45:36 +01:00
checkSyncIdListeners();
}
else if (message.type === 'sync-hash-check-failed') {
2019-10-20 10:00:18 +02:00
toastService.showError("Sync check failed!", 60000);
}
else if (message.type === 'consistency-checks-failed') {
2019-10-20 10:00:18 +02:00
toastService.showError("Consistency checks failed! See logs for details.", 50 * 60000);
}
}
let syncIdReachedListeners = [];
function waitForSyncId(desiredSyncId) {
if (desiredSyncId <= lastSyncId) {
return Promise.resolve();
}
return new Promise((res, rej) => {
syncIdReachedListeners.push({
desiredSyncId,
resolvePromise: res,
start: Date.now()
})
});
}
2019-10-28 19:45:36 +01:00
function checkSyncIdListeners() {
syncIdReachedListeners
.filter(l => l.desiredSyncId <= lastSyncId)
.forEach(l => l.resolvePromise());
syncIdReachedListeners = syncIdReachedListeners
.filter(l => l.desiredSyncId > lastSyncId);
syncIdReachedListeners.filter(l => Date.now() > l.start - 60000)
.forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastSyncId} for ${Date.now() - l.start}`));
}
async function consumeSyncData() {
if (syncDataQueue.length >= 0) {
const allSyncData = syncDataQueue.slice();
syncDataQueue = [];
const outsideSyncData = allSyncData.filter(sync => sync.sourceId !== glob.sourceId);
// the update process should be synchronous as a whole but individual handlers can run in parallel
await Promise.all([
...allSyncMessageHandlers.map(syncHandler => syncHandler(allSyncData)),
...outsideSyncMessageHandlers.map(syncHandler => syncHandler(outsideSyncData))
]);
lastSyncId = allSyncData[allSyncData.length - 1].id;
}
}
function connectWebSocket() {
const protocol = document.location.protocol === 'https:' ? 'wss' : 'ws';
// use wss for secure messaging
const ws = new WebSocket(protocol + "://" + location.host);
2019-07-06 12:03:51 +02:00
ws.onopen = () => console.debug(utils.now(), "Connected to server with WebSocket");
ws.onmessage = handleMessage;
2019-07-06 12:03:51 +02:00
// we're not handling ws.onclose here because reconnection is done in sendPing()
return ws;
}
setTimeout(() => {
ws = connectWebSocket();
lastSyncId = glob.maxSyncIdAtLoad;
2019-02-10 16:36:25 +01:00
lastPingTs = Date.now();
setInterval(async () => {
2019-02-10 16:36:25 +01:00
if (Date.now() - lastPingTs > 30000) {
console.log(utils.now(), "Lost connection to server");
}
2019-07-06 12:03:51 +02:00
if (ws.readyState === ws.OPEN) {
2019-06-26 20:49:17 +02:00
ws.send(JSON.stringify({
type: 'ping',
lastSyncId: lastSyncId
}));
}
2019-07-06 12:03:51 +02:00
else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
console.log(utils.now(), "WS closed or closing, trying to reconnect");
2019-07-06 12:03:51 +02:00
ws = connectWebSocket();
}
}, 1000);
2018-04-05 23:17:19 -04:00
}, 0);
2017-12-01 22:28:22 -05:00
2019-10-25 22:20:14 +02:00
subscribeToMessages(message => {
if (message.type === 'sync-pull-in-progress') {
toastService.showPersistent({
id: 'sync',
title: "Sync status",
message: "Sync update in progress",
icon: "refresh"
});
}
2019-10-28 19:45:36 +01:00
else if (message.type === 'sync-pull-finished') {
2019-10-28 20:26:40 +01:00
// this gives user a chance to see the toast in case of fast sync finish
setTimeout(() => toastService.closePersistent('sync'), 1000);
2019-10-25 22:20:14 +02:00
}
});
export default {
logError,
subscribeToMessages,
subscribeToAllSyncMessages,
subscribeToOutsideSyncMessages,
waitForSyncId
};