'use strict';
import { Buffer } from 'buffer';
import { errorLogger} from './utils/requestLogger.js';
import { isObjectVisible } from './utils/authChecks.js';
import { HEX2uuid, UUID2hex } from '@commtool/sql-query';
import { normalizeUserSession } from '@commtool/shared-auth';
// @ts-check
/**
* @import {ExpressRequest, ExpressResponse} from './types.js'
*/
/**
* @typedef {Object} SocketIOServer - Socket.IO server instance
*/
/**
* @typedef {Object} SocketIOSocket - Socket.IO socket connection
* @property {Object} request - HTTP request object
* @property {Object} data - Socket data storage
* @property {string} id - Socket connection ID
* @property {boolean} connected - Whether socket is connected
* @property {function(string, any): void} emit - Emit event to socket
* @property {function(boolean): void} disconnect - Disconnect socket
*/
/** @type {SocketIOServer|null} */
let ioInstance
/** @type {Map<string, any>} */
export const clientDataStore = new Map();
// Delayed update timestamps for batching updates
/** @type {Map<string, number>} */
export const delayedUpdateTimestamps = new Map();
// Debounce delay for updates (2.5 seconds)
/** @type {number} */
const debounceDelay = 7000;
/**
* Sets up Socket.IO event handlers
* @param {SocketIOServer} io - Socket.IO server instance
*/
export const setupSocketHandlers = (io) => {
ioInstance = io;
io.on('connection', (socket) => {
try {
// User is set by createSocketAuthMiddleware from Bearer token
const userInfo = socket.data.user;
if (!userInfo) {
socket.disconnect(true);
console.error('Socket connection without valid authentication');
return;
}
const user = userInfo.userUID || userInfo.id;
// Get client ID from query parameters
const id = socket.handshake.query.id;
if (!id) {
socket.disconnect(true);
console.error('Socket connection without client ID');
return;
}
// Get organization from header or user token
const orgFromHeader = socket.handshake.headers['x-organization'];
const orgUID = orgFromHeader || userInfo.organization || userInfo.loginOrgaUID;
if (!orgUID || orgUID === 'undefined') {
socket.disconnect(true);
console.error('Socket connection without organization context');
return;
}
// Create a unique ID for this client
const UID = 'UUID-' + id;
const socketID = Buffer.from(UID).toString('base64');
console.log(`Client connected: ${user} (${userInfo.source}), org: ${orgUID}, socketID: ${socketID}`);
// Store connection info
socket.data.connectID = socketID;
socket.data.user = user;
socket.data.userInfo = userInfo;
socket.data.root = orgUID;
socket.data.userHex = UUID2hex(user);
// Create a session object for compatibility with authChecks functions
// Using normalizeUserSession from auth-lib for consistency
const normalizedUser = normalizeUserSession(userInfo, userInfo.source || 'websocket');
socket.request.session = {
user: user,
baseUser: userInfo.baseUserUID || user,
root: orgUID,
loginOrgaUID: userInfo.loginOrgaUID || orgUID,
authUser: normalizedUser,
};
// Also set organization in headers for other middleware
if (!socket.request.headers) {
socket.request.headers = {};
}
socket.request.headers['x-organization'] = orgUID;
// Join a room for this specific client
socket.join(socketID);
// Create client data entry
clientDataStore.set(socketID, {
updateBuffer: null,
updateAbo: null,
objectAbo: [],
messageBuffer: [],
UIDuser: user,
UIDroot: orgUID,
orgSource: orgFromHeader ? 'header' : (socket.request.session.root ? 'session' : 'user-default'),
connectedAt: Date.now()
});
// Handle transactions
socket.on('transaction', (data) => {
socket.data.transaction = data.start;
socket.emit('transactionAck', { received: true });
});
// Handle list monitoring
socket.on('monitor', async (data) => {
if (data && data.UID) {
await addListAbo(socketID, data.UID, socket.request);
socket.emit('monitorAck', { success: true });
}
});
// Handle object monitoring
socket.on('monitorObject', async (data) => {
if (data && data.UID) {
await addObjectAbo(socketID, data.UID, socket.request);
socket.emit('monitorObjectAck', { success: true });
}
});
// Replace your ping/pong with a custom event
socket.on('setRoot', (data) => {
if (data && data.UIDroot) {
socket.data.root = data.UIDroot;
socket.data.rootHex = UUID2hex(data.UIDroot);
// Update client data store
const clientData = clientDataStore.get(socketID);
if (clientData) {
clientData.UIDroot = data.UIDroot;
clientData.orgSource = 'dynamic';
}
}
socket.emit('rootSet', { success: true, root: socket.data.root });
// Send any buffered messages
sendMessageBuffered(socket, null);
});
// Handle client disconnect
socket.on('disconnect', (reason) => {
console.log(`Client disconnected: ${socket.data.user}, reason: ${reason}`);
// Keep client data in store for possible reconnection
});
// Handle connection errors
socket.on('error', (error) => {
console.error(`Socket error for client ${socketID}:`, error);
});
} catch (e) {
console.error('Error in Socket.IO connection handler:', e);
socket.disconnect(true);
}
});
};
/**
* Sends a message to a client, buffering it if the client is not ready
* @param {SocketIOSocket} socket - Socket.IO socket
* @param {Object} message - Message to send
*/
export const sendMessageBuffered = (socket, message) => {
try {
// Make sure we have a valid socket and connection ID
if (!socket || !socket.data || !socket.data.connectID) {
return;
}
const clientData = clientDataStore.get(socket.data.connectID);
if (!clientData) {
return;
}
// If socket is not connected, Socket.IO will automatically buffer the message
// for future delivery when connection is restored
if (message) {
if (message.update) {
// For updates, we only care about the latest one, so store it
clientData.updateBuffer = { UID: message.UID, timestamp: message.timestamp };
// Only send if connected
if (socket.connected) {
socket.emit('update', {
update: true,
UID: message.UID,
timestamp: message.timestamp
});
clientData.updateBuffer = null;
}
} else {
// Regular messages can be sent directly - Socket.IO will buffer if needed
socket.emit('message', message);
}
} else if (socket.connected) {
// This is a request to flush any pending messages
// Send update buffer if it exists
if (clientData.updateBuffer) {
socket.emit('update', {
update: true,
UID: clientData.updateBuffer.UID,
timestamp: clientData.updateBuffer.timestamp
});
clientData.updateBuffer = null;
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Adds an object to the client's subscription list
* @param {string} socketID - Client's socket ID
* @param {string|string[]} UIDs - Object UIDs to monitor
* @param {Object} req - Request object for authorization
*/
/**
* Adds object subscriptions for a socket client
* @param {string} socketID - The socket connection ID
* @param {string|string[]|'quit'} UIDs - Object UIDs to subscribe to or 'quit' to clear all
* @param {Object} req - Request object for authorization
* @returns {Promise<void>}
*/
export const addObjectAbo = async (socketID, UIDs, req) => {
try {
const clientData = clientDataStore.get(socketID);
if (!clientData) return;
if (UIDs === 'quit') {
clientData.objectAbo = [];
} else {
const UIDobjects = Array.isArray(UIDs) ? UIDs.map(U => UUID2hex(U)) : [UUID2hex(UIDs)];
let myAbos = [];
for (const myUID of UIDobjects) {
if (await isObjectVisible(req, myUID)) {
myAbos.push(myUID);
}
}
clientData.objectAbo = myAbos;
}
} catch (e) {
errorLogger(e);
}
};
/**
* Adds a list to the client's subscription
* @param {string} socketID - Client's socket ID
* @param {string|'quit'} UID - List UID to monitor or 'quit' to unsubscribe
* @param {Object} req - Request object for authorization
* @returns {Promise<void>}
*/
export const addListAbo = async (socketID, UID, req) => {
try {
const clientData = clientDataStore.get(socketID);
if (!clientData) return;
if (UID === 'quit') {
clientData.updateAbo = null;
} else {
const UIDlist = UUID2hex(UID);
if (await isObjectVisible(req, UIDlist)) {
clientData.updateAbo = UIDlist;
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Updates the queue status for all clients with matching root
* @param {string|Buffer} root - The root identifier to match clients against
* @param {any} status - The new queue status to set
*/
export const updateQueueStatus = (root, status) => {
try {
if (!ioInstance) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDroot &&
UUID2hex(clientData.UIDroot).equals(UUID2hex(root))) {
const socket = ioInstance.in(socketID);
socket.emit('message', { root: root, queueStatus: status });
// Also update the client data
clientData.queueStatus = status;
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Sends an update template message to clients with matching root
* @param {Buffer|string} root - The root identifier to match clients against
*/
export const updateTemplate = (root) => {
try {
if (!ioInstance) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDroot &&
UUID2hex(clientData.UIDroot).equals(UUID2hex(root))) {
const socket = ioInstance.in(socketID);
socket.emit('message', {
root: root,
updateTemplate: Date.now()
});
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Updates configuration for clients with matching root and app type
* @param {string|object} app - The application configuration to update
* @param {Buffer|string} root - The root identifier to match against client roots
*/
export const updateConfig = (app, root) => {
try {
if (!ioInstance) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDroot &&
UUID2hex(clientData.UIDroot).equals(UUID2hex(root))) {
const socket = ioInstance.in(socketID);
socket.emit('message', {
updateConfig: Date.now(),
app
});
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Sends fake user data to clients associated with a specific user
* @param {Object} session - The session object containing user information
*/
export const wsFakeUser = (session) => {
try {
if (!ioInstance || !session || !session.user) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDuser &&
clientData.UIDuser===session.user) {
const socket = ioInstance.in(socketID);
socket.emit('message', {
baseUser: session.baseUser,
user: session.user,
fakeLogin: session.fakeLogin
});
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Signals template change for the event app for clients with matching root
* @param {Buffer|string} root - The root identifier to match against clients
*/
export const updateTemplateEvent = (root) => {
try {
if (!ioInstance || !root) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDroot &&
UUID2hex(clientData.UIDroot).equals(UUID2hex(root))) {
const socket = ioInstance.in(socketID);
socket.emit('message', {
root: root,
updateTemplateEvent: Date.now()
});
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Sends progress status to clients with matching root
* @param {string|Buffer} root - The root identifier to match clients against
* @param {Object} status - The maintenance status object to send
*/
export const sendProgressStatus = (root, status) => {
try {
if (!ioInstance || !root) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.UIDroot &&
UUID2hex(clientData.UIDroot).equals(UUID2hex(root))) {
const socket = ioInstance.in(socketID);
socket.emit('message', {progressStatus: status });
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Sends an update message with the data to all clients monitoring an object
* @param {string|Buffer} UID - The unique identifier of the object
* @param {*} value - The value to be sent in the update message
*/
export const addUpdateEntry = (UID, value) => {
try {
if (!ioInstance) return;
const myUID = UUID2hex(UID);
if (!myUID) return;
for (const [socketID, clientData] of clientDataStore) {
if (Array.isArray(clientData.objectAbo) &&
clientData.objectAbo.some(oa => oa && oa.equals(myUID))) {
const socket = ioInstance.in(socketID);
socket.emit('message', { object: true, value: value });
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Sends the UUID's of a List which needs to be updated in the client
* @param {string|Buffer|string[]|Buffer[]} values - Single UUID string or array of UUID strings
* @returns {boolean} - Returns true if processed successfully, false on error
*/
export const addUpdateList = (values) => {
try {
const updates = Array.isArray(values)
? values.filter(v => v).map(v => UUID2hex(v))
: values ? [UUID2hex(values)] : [];
if (updates.length > 0) {
const timestamp = Date.now();
for (const update of updates) {
const updateKey = update.toString('base64');
// Check if this update is already being debounced
if (delayedUpdateTimestamps.has(updateKey)) {
// Update already queued, just refresh its timestamp
delayedUpdateTimestamps.set(updateKey, timestamp);
} else {
// First update for this list - send immediately and start debouncing
delayedUpdateTimestamps.set(updateKey, timestamp);
// Send immediately to subscribed clients
for (const [socketID, clientData] of clientDataStore) {
if (clientData.updateAbo && clientData.updateAbo.equals(update)) {
const socket = ioInstance.in(socketID);
socket.emit('message', {
update: true,
UID: HEX2uuid(update),
timestamp: timestamp
});
}
}
}
}
return true;
}
return false;
} catch (e) {
errorLogger(e);
return false;
}
};
/**
* Monitors delayed updates and sends them if they're older than the threshold
* Uses the global ioInstance to send updates
* @returns {void}
*/
export const monitorDelayedUpdates = () => {
try {
if (!ioInstance) return;
const debounceDelay = 2500; // 2.5 seconds debounce
const now = Date.now();
// Check all delayed updates
for (const [UIDlistB64, timestamp] of delayedUpdateTimestamps) {
// If update has been delayed long enough, process it
if (now - timestamp > debounceDelay) {
try {
const UIDlist = Buffer.from(UIDlistB64, 'base64');
// Find all clients that have subscribed to this list
for (const [socketID, clientData] of clientDataStore) {
if (clientData.updateAbo &&
Buffer.isBuffer(clientData.updateAbo) &&
clientData.updateAbo.equals(UIDlist)) {
const socket = ioInstance.in(socketID);
// Send update notification
socket.emit('message', {
update: true,
UID: HEX2uuid(UIDlist),
timestamp: now
});
}
}
// Remove this update from the delayed updates map
delayedUpdateTimestamps.delete(UIDlistB64);
} catch (innerError) {
console.error('Error processing delayed update:', innerError);
}
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Broadcasts an update to all clients monitoring a specific list (by updateAbo)
* @param {string|Buffer} UID - The unique identifier of the list
* @param {number} timestamp - The timestamp of the update
*/
export const broadcastUpdate = (UID, timestamp = Date.now()) => {
try {
if (!ioInstance) return;
const myUID = UUID2hex(UID);
if (!myUID) return;
for (const [socketID, clientData] of clientDataStore) {
if (clientData.updateAbo && clientData.updateAbo.equals(myUID)) {
const socket = ioInstance.in(socketID);
socket.emit('update', {
update: true,
UID: HEX2uuid(myUID),
timestamp
});
}
}
} catch (e) {
errorLogger(e);
}
};
/**
* Handles sending a transaction message to a specific client
* @param {string} socketID - The socket ID of the client
* @param {object} data - The transaction data to send
*/
export const sendTransaction = (socketID, data) => {
try {
if (!ioInstance) return;
const socket = ioInstance.in(socketID);
socket.emit('transactionComplete', data);
} catch (e) {
errorLogger(e);
}
};
/**
* Cleans up all client data and buffers (e.g., on shutdown)
*/
export const cleanupSocketData = () => {
try {
clientDataStore.clear();
delayedUpdateTimestamps.clear();
} catch (e) {
errorLogger(e);
}
};
// Update the default export to include all functions
export default {
clientDataStore,
delayedUpdateTimestamps,
setupSocketHandlers,
sendMessageBuffered,
broadcastUpdate, // <-- added
sendTransaction, // <-- added
monitorDelayedUpdates,
cleanupSocketData, // <-- added
updateQueueStatus,
updateTemplate,
updateConfig,
wsFakeUser,
updateTemplateEvent,
sendProgressStatus,
addUpdateEntry,
addUpdateList,
};