import redis from 'redis'
import { UUID2hex,HEX2uuid, query } from '@kpe-back/sql-query';
import { errorLoggerUpdate, errorLoggerRead} from './Router/helpers/requestLogger.js';
export const wsStore = redis.createClient({ socket: { port: 6379, host: process.env.NODE_ENV === 'production' ? 'redis' : 'redis-dev' } })
wsStore.on('error', err => console.log('client error', err));
wsStore.on('connect', () => console.log('client is connect'));
wsStore.on('reconnecting', () => console.log('client is reconnecting'));
wsStore.on('ready', () => console.log('client is ready'));
wsStore.connect()
export const wsClients = new Map()
export const clientDataStore = new Map()
export const delayedUpdateTimestamps = new Map()
/**
* Sends a message to a WebSocket client with buffering capability.
* If the connection is alive, sends the message immediately and flushes any buffered messages.
* If the connection is not alive, buffers the message for later delivery.
*
* @param {WebSocket} ws - The WebSocket connection object with a connectID property
* @param {Object} message - The message to send
* @param {boolean} [message.update] - If true, the message is treated as an update message
* @param {string} [message.UID] - Unique identifier for update messages
* @param {number} [message.timestamp] - Timestamp for update messages
*
* @throws {Error} Throws an error if WebSocket operations fail
*
* @description
* The function handles two types of messages:
* 1. Regular messages - Added to messageBuffer array
* 2. Update messages - Stored in updateBuffer, replacing any existing update
*
* When connection is alive (ws.notAlive === 0):
* - Sends current message if exists
* - Sends any buffered update message
* - Sends all buffered regular messages
*/
export const sendMessageBuffered = (ws, message) =>
{
try {
const wsID = ws.connectID
const clientData=clientDataStore.get(wsID)
if(!clientData)
return
if (ws.notAlive === 0)
{
// web socket connection is alive
if (message) {
// send the current message out,
ws.send(JSON.stringify(message))
}
// empty buffers, if they exits
// check , if we have an update message in the buffer, in case, we send it now
if (clientData.updateBuffer) {
ws.send(JSON.stringify({ update: true,... clientData.updateBuffer }))
// console.log('message buffer update buffer')
// and delete the updateBuffer key in redis
clientData.updateBuffer=null
}
// now send out the other messages
if(clientData.messageBuffer && clientData.messageBuffer.length>0)
{
for(const bMessage of clientData.messageBuffer)
{
ws.send(JSON.stringify(bMessage))
}
clientData.messageBuffer=[]
}
}
else if (message)
{
// buffer the messages in case the client is becoming online again
// get existing buffer
// store only one update buffer
if(message.update)
{
// replace the current updateBuffer
clientData.updateBuffer={UID:message.UID,timestamp:message.timestamp}
}
else
{
clientData.messageBuffer=[...clientData.messageBuffer,message]
}
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Sends the UUID's of a List, which needs to be updated in the client
* @param {string|string[]} values - Single UUID string or array of UUID strings to be processed
* @returns {Promise<boolean>} Returns true if updates were processed, false if no valid updates
* @throws {Error} Throws and logs any errors that occur during processing
*/
export const addUpdateList = async (values) => {
try {
const updates = Array.isArray(values) ? values.filter(v=>v).map(v=>UUID2hex(v)) : values? [UUID2hex(values)] : [] ;
if(updates.length > 0) {
// Synchrone Operationen...
// Warte auf einen kleinen Sleep, um sicherzustellen, dass alle Nachrichten verarbeitet werden
await new Promise(resolve => setTimeout(resolve, 10));
return true; // Explizite Rückgabe
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Sends an update message with the data to be updated to all clients monitoring an object with the specified UID.
* @param {string|UUID} UID - The unique identifier of the object. Can be a string or UUID.
* @param {*} value - The value to be sent in the update message.
* @throws {Error} - Any error that occurs during the update process will be logged via errorLoggerRead.
* @returns {Promise<void>}
*/
export const addUpdateEntry = async (UID, value) => {
try {
// send update message containing value to all clients monitoring update of objects with this UID immediately
const myUID = UUID2hex(UID) // allow string UID's as UUID2hex handles this
// object:UID is a set of all cients requesting updates for this object
if(myUID)
{
for (const [wsKey,clientData] of clientDataStore)
{
if(clientData.objectAbo.some(oa=>oa && oa.equals(myUID)))
{
sendMessageBuffered(wsClients.get(wsKey), { object: true, value: value })
}
}
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Sends an update template message to WebSocket clients associated with a specific root.
*
* @param {Buffer|string} root - The root identifier to match clients against
* @returns {Promise<void>}
*
* @throws {Error} - Forwards any errors to errorLoggerRead
*
* @description
* Iterates through all WebSocket clients and sends an update template message
* to those whose root organisation matches the provided root parameter (the organisation for which a template changes).
* The message includes the root and current timestamp.
*/
export const updateTemplate = async (root) => {
//
try {
for (const client of wsClients.values()) {
if(client.root?.equals(UUID2hex(root)))
sendMessageBuffered(client, { root: root, updateTemplate: Date.now() })
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Updates configuration for WebSocket clients with matching root organisation and matching app type
* Is triggered when for this organisation the yaml configuration file is update
* @param {string|object} app - The application configuration to update.
* @param {Buffer|string} root - The root identifier to match against client roots.
* @returns {Promise<void>} A promise that resolves when the update is complete.
* @throws {Error} If there's an error during the update process, it will be logged via errorLoggerRead.
*/
export const updateConfig = async (app,root) => {
//
try {
for (const client of wsClients.values()) {
if(client.root?.equals(UUID2hex(root)))
sendMessageBuffered(client, { updateConfig: Date.now(), app })
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Sends fake user data to WebSocket clients associated with a specific user session
* Is triggered when an administraor faking a login -simulating for himself being logged in as another user-
*
* @param {Object} session - The session object containing user information
* @param {Object} session.baseUser - The base user information
* @param {string} session.user - The user identifier
* @param {boolean} session.fakeLogin - Flag indicating if this is a fake login
* @throws {Error} If there's an error during WebSocket communication
* @async
*/
export const wsFakeUser = async (session) => {
//
try {
if(session && session.user)
for (const client of wsClients.values()) {
if(client.user.equals(UUID2hex(session.user)))
sendMessageBuffered(client, { baseUser: session.baseUser, user: session.user, fakeLogin: session.fakeLogin})
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Signals template change for the event app for connected WebSocket clients with matching root organisation.
* Sends a buffered message containing the root and current timestamp to each matching client.
*
* @param {Buffer|string} root - The root identifier to match against clients
* @returns {Promise<void>}
* @throws {Error} Logs error if message sending fails
*/
export const updateTemplateEvent = async (root) => {
//
if(root)
{
try {
for (const client of wsClients.values()) {
if(client.root?.equals(UUID2hex(root)))
sendMessageBuffered(client, { root: root, updateTemplateEvent: Date.now() })
}
}
catch(e)
{
errorLoggerRead(e)
}
}
}
/**
* Updates the queue status (no of opentasks in the queue loop) for WebSocket clients with matching root.
* @param {string|Buffer} root - The root identifier to match clients against
* @param {string|object} status - The queue status to send to matching clients
* @throws {Error} - Logs error via errorLoggerRead if operation fails
*/
export const updateQueueStatus= (root,status)=>
{
try {
for (const client of wsClients.values()) {
if(client.root?.equals(UUID2hex(root)))
{
sendMessageBuffered(client, { root: root, queueStatus: status })
}
}
}
catch(e)
{
errorLoggerRead(e)
}
}
/**
* Sends maintenance status to WebSocket clients with matching root organisation.
* Used to update status of maintenance operations. in the admin app
*
* @param {string|Buffer} root - The root identifier to match clients against
* @param {Object} status - The maintenance status object to send
* @param {string} status.type - Type of the message, set to 'maintenanceStatus'
* @throws {Error} If there's an error sending the status
*/
export const sendProgressStatus = (root, status) => {
try {
for (const client of wsClients.values()) {
if(client.root?.equals(UUID2hex(root))) {
sendMessageBuffered(client, {
maintenanceStatus: status
});
}
}
} catch(e) {
errorLoggerRead(e);
}
}