// @ts-check
/**
* @typedef {Object} PublishEventOptions
* @property {string} organization - The organization UUID for multi-tenant scoping
* @property {any[] | any} data - The event data payload
* @property {number} [backDate] - Unix timestamp for backdated events
*/
import { query } from '@commtool/sql-query';
import redis from 'redis'
import { errorLogger } from './requestLogger.js';
/** @type {redis.RedisClientType | null} */
export let pubClient = null
/** @type {string} */
let redisUrl = 'unknown'
/** @type {boolean} */
let isInitialized = false
/**
* Initialize the Redis connection after secrets are loaded
* @async
* @function initializeRedis
* @returns {Promise<boolean>} Returns true if initialized successfully, false otherwise
*/
export async function initializeRedis() {
if (isInitialized) return true;
try {
// Import the merged secrets after they've been loaded
const {mergedSecrets} = await import('../http-server.js');
// Check if secrets are available yet
if (!mergedSecrets || !mergedSecrets.redisServer) {
// Secrets not loaded yet - this is expected during early bootstrap
return false;
}
// the messaging system a seperate redis client is recommended
redisUrl=`redis://${mergedSecrets.redisServer}:${mergedSecrets.redisPort || 6379}`;
pubClient = redis.createClient({
url: redisUrl,
password: mergedSecrets.redisPassword || undefined,
legacyMode: false,
socket: {
connectTimeout: 10000, // 10 seconds
reconnectStrategy: (retries) => {
if (retries > 5) return new Error('Max retries reached');
return Math.min(retries * 1000, 5000); // Exponential backoff
}
}
});
await pubClient.connect();
pubClient.on('error', /** @param {Error} err */ (err) => console.log('client error', err));
console.log(`[pubSubClient] Connected to Redis at ${redisUrl}`);
isInitialized = true;
return true;
} catch (err) {
const errorMessage = `[pubSubClient] Failed to connect to Redis at ${redisUrl || 'unknown'}`;
console.error(errorMessage, err);
errorLogger(err);
throw err;
}
}
/**
* Publishes an event to Redis with multi-tenant organization scoping
* @param {string} eventKey - The event key/channel to publish to
* @param {object} options - Publishing options
* @param {string} options.organization - The organization UUID for multi-tenant scoping
* @param {any[] | any} options.data - The event data payload
* @param {number} [options.backDate] - Unix timestamp for backdated events
* @returns {Promise<void>}
*/
export const publishEvent=async (eventKey, options)=>
{
try {
// Initialize Redis if not already done
if (!isInitialized) {
const initialized = await initializeRedis();
if (!initialized) {
// Redis not ready yet (secrets not loaded) - silently skip during bootstrap
console.log(`[publishEvent] Skipping event '${eventKey}' - Redis not initialized yet (early bootstrap)`);
return;
}
}
if(!pubClient) {
console.warn(`[publishEvent] No Redis client available, skipping event '${eventKey}'`);
return;
}
} catch(err) {
console.error('[publishEvent] Redis client error:', err);
errorLogger(err);
return;
}
// Type-safe destructuring with proper validation
if (typeof eventKey !== 'string') {
throw new TypeError(`Event key must be a string, got ${typeof eventKey}`);
}
if (typeof options !== 'object' || options === null) {
throw new TypeError(`Options must be an object, got ${typeof options}`);
}
// Extract options with defaults and validation
const { organization = null, data = null, backDate = null } = options;
if (organization !== null && typeof organization !== 'string') {
console.error(`Organization must be a string or null, got ${typeof organization}`);
return;
}
if (backDate !== null && typeof backDate !== 'number') {
console.error(`BackDate must be a number or null, got ${typeof backDate}`);
return;
}
//console.log('publishing to '+eventKey)
// we always send the orga as well
const UIDorga = organization ? organization : 'UUID-00000000-0000-0000-0000-000000000000'
const myData = data !== null || backDate !== null ?
JSON.stringify({data: data, UIDorga, backDate: backDate ? backDate : Math.floor(Date.now()/1000)}) :
JSON.stringify({UIDorga})
pubClient.publish(eventKey, myData).catch(/** @param {Error} err */ err => errorLogger(err));
console.log(eventKey)
query(`INSERT INTO eventLog (EventKey,Data) VALUES(?,?)
ON DUPLICATE KEY UPDATE EventKey=VALUE(EventKey)
`, [eventKey, myData])
}