/**
* @module server
* @description Main server application for Commtool Members API
*
* This module initializes and configures the HTTP and WebSocket server with:
* - Express.js HTTP server with REST API endpoints
* - Socket.IO WebSocket server for real-time communication
* - Authentication and session management
* - Database connections and initialization
* - Error handling and logging
* - Graceful shutdown handling
*
* The server provides both REST API endpoints and real-time WebSocket connections
* for managing organization data, user authentication, and collaborative features.
*/
'use strict';
// @ts-check
/**
* @import {ExpressRequest, ExpressResponse} from './types.js'
*/
import http from 'http';
import { Server } from 'socket.io';
import errorHandler from './errorHander.js';
import { errorLogger, configLoggers} from './utils/requestLogger.js';
import { loadSecretsFromVault } from '@commtool/vault-secrets';
import { loadOrganizationDomains, createCorsOriginChecker, createSocketAuthMiddleware } from '@commtool/shared-auth';
import { initS3 } from './utils/s3Client.js';
/**
* Initialize the application by loading secrets and setting up modules
* @async
* @function initializeApp
* @returns {Promise<object>} Object containing all the imported module functions
*/
const initializeApp = async () => {
// Import required modules
await loadSecretsFromVault();
const { default: query, HEX2uuid, recreateMainConnection, recreatePool, UUID2hex, UUID2base64, createMainConnection } = await import('@commtool/sql-query');
// Initialize database connection after secrets are loaded
console.log('Creating main database connection...');
await createMainConnection();
console.log('Database connection established successfully');
configLoggers();
const { createDB, initOrga, checkOrgaExists } = await import('./config/initOrga.js');
await createDB(process.env.DB_DATABASE);
const { readAllConfigs } = await import('./utils/compileTemplates.js');
// Import Socket.IO helper functions
const {
clientDataStore,
setupSocketHandlers,
broadcastUpdate,
sendTransaction,
monitorDelayedUpdates,
cleanupSocketData
} = await import('./server.ws.js');
// Import tree processing modules
const treeQueue = await import('./tree/treeQueue/treeQueue.js');
const treeMigrate = await import('./tree/treeQueue/treeMigrate.js');
console.log(process.env);
return {
readAllConfigs,
checkOrgaExists,
setupSocketHandlers,
broadcastUpdate,
sendTransaction,
monitorDelayedUpdates,
cleanupSocketData,
triggerQueues: treeQueue.triggerQueues,
migrateAction: treeMigrate.migrateAction
};
};
/** @type {any} */
export let io;
/** @type {NodeJS.Timeout|null} */
let monitorInterval;
// Global variables for exported functions
let broadcastUpdate, sendTransaction;
/**
* Initialize the server with HTTP and WebSocket capabilities
* @async
* @function initServer
* @param {boolean} test - Whether this is a test environment
* @returns {Promise<void>}
*/
const initServer = async (test) => {
// Get the initialized modules
const modules = await initializeApp();
// Assign functions to global variables for export
broadcastUpdate = modules.broadcastUpdate;
sendTransaction = modules.sendTransaction;
await initS3();
const orgas = await modules.readAllConfigs();
// Initialize Redis connection for events system (after secrets are loaded)
const { initializeRedis } = await import('./utils/events.js');
await initializeRedis();
// Initialize queue action handlers BEFORE checkOrgaExists, as initOrga may call queueAdd
const { triggerQueues } = await import('./tree/treeQueue/treeQueue.js');
await triggerQueues(); // Initialize action handlers and process any leftover queue items
await Promise.all(orgas.map(async (UIDorga) => (
modules.checkOrgaExists(UIDorga)
)));
// Create HTTP server
const { createApp } = await import('./http-server.js');
const app = await createApp();
const server = http.createServer(app);
// Load organization domains from Vault for Socket.IO CORS
const appBaseDomain = process.env.APP_BASE || 'commtool.org';
const domainOrgMap = await loadOrganizationDomains(appBaseDomain);
const legacyDomains = ['adbmv.de', 'kpe.de'];
// Create Socket.IO server with proxy and CORS settings
io = new Server(server, {
cors: {
origin: (createCorsOriginChecker({
domainOrgMap,
additionalDomains: legacyDomains,
builtInDomains: [appBaseDomain, ...legacyDomains]
})),
methods: ["GET", "POST", "OPTIONS"],
allowedHeaders: ["Content-Type", "Authorization", "Pragma", "Cache-Control", "X-Organization"],
exposedHeaders: ["X-Organization"],
credentials: true
},
path: '/socket.io/',
transports: ['polling', 'websocket'], // Put polling first so it doesn't try to upgrade if websocket fails
// Shorter polling duration
pingInterval: 10000, // Check connection every 10 seconds
pingTimeout: 5000, // Wait 5 seconds for ping response
connectTimeout: 10000, // Wait 10 seconds for initial connection
// Polling optimizations
maxHttpBufferSize: 1e6, // 1MB
httpCompression: true,
// Prevent polling requests from hanging
allowUpgrades: true,
upgradeTimeout: 5000, // Timeout for transport upgrades
// Shorter polling cycle
cookie: false // Disable cookie to reduce overhead
});
// Add this right after creating the io instance
// Monkey patch Socket.IO engine to directly add CORS headers
// Socket.IO Bearer Token Authentication
io.use(createSocketAuthMiddleware({ required: true }));
// Log authenticated connections
io.on('connection', (socket) => {
const user = socket.data.user;
if (user) {
console.log(`Socket connected: ${socket.id} - User: ${user.username || user.email || user.id}`);
}
});
// Add error handling for the engine
io.engine.on('connection_error', (err) => {
console.error('Socket.IO engine connection error:', {
code: err.code,
message: err.message,
context: err.context,
reqUrl: err.req?.url,
headers: err.req?.headers
});
});
// Add error event handler at engine level
io.engine.on('error', (err) => {
console.error('Socket.IO engine error:', err);
})
// Add detailed upgrade logging
io.engine.on('connection', (socket) => {
const startTime = Date.now();
const transport = socket.transport.name;
console.log(`[Socket.IO Engine] New connection using ${transport} transport:`, socket.id);
// Check if WebSocket is supported
const wsSupported = socket.request &&
socket.request.headers &&
socket.request.headers.upgrade &&
socket.request.headers.upgrade.toLowerCase() === 'websocket';
console.log(`[Socket.IO Engine] WebSocket supported for ${socket.id}: ${wsSupported}`);
// Monitor upgrade events
socket.on('upgrading', (transport) => {
console.log(`[Socket.IO Engine] Connection ${socket.id} upgrading to ${transport.name} after ${Date.now() - startTime}ms`);
});
socket.on('upgrade', (transport) => {
console.log(`[Socket.IO Engine] Connection ${socket.id} successfully upgraded to ${transport.name} after ${Date.now() - startTime}ms`);
});
socket.on('upgradeError', (err) => {
console.error(`[Socket.IO Engine] Upgrade error for ${socket.id} after ${Date.now() - startTime}ms:`, err);
});
});
// Enable more detailed debug logs for Socket.IO engine
io.engine.on('connection', (socket) => {
// Log whenever a transport probe is requested
socket.transport.on('probe', () => {
console.log(`[Socket.IO Engine] Transport probe request for ${socket.id}`);
});
// Log whenever a packet is received
socket.on('packet', (packet) => {
if (packet.type === 'ping' || packet.type === 'pong') return; // Skip logging noisy ping/pong
console.log(`[Socket.IO Engine] Packet received by ${socket.id}: type=${packet.type}`);
});
});
// Also check if the upgrade header is present in the initial connection
server.on('upgrade', (req, socket, head) => {
console.log('[HTTP Server] WebSocket upgrade request received:', {
path: req.url,
headers: {
upgrade: req.headers.upgrade,
connection: req.headers.connection,
origin: req.headers.origin
}
});
});
// Set up websocket handling
modules.setupSocketHandlers(io);
// Initialize tree queue processor (this will trigger existing queues)
await modules.triggerQueues();
// Monitor delayed updates (similar to your previous monitorInterval)
monitorInterval = setInterval(() => modules.monitorDelayedUpdates(io), 1000);
try {
// Start the server
server.listen(3000, () => {
// Trigger all queues once to execute leftovers from last shut-down
triggerQueues();
console.log("HTTP and Socket.IO server is running on port 3000.");
// Set up error handler
console.error = errorHandler;
// Disable console.log in production
if (process.env.LOGGER !== 'dev') {
console.log('Console logging disabled in production mode, set environment variable LOGGER=dev to re-enable it');
console.log = function () { };
}
});
} catch (e) {
console.error(e);
errorLogger(e);
}
// Add graceful shutdown handlers
const gracefulShutdown = () => {
console.log('Shutting down server gracefully...');
// Clear the monitor interval
if (monitorInterval) {
clearInterval(monitorInterval);
}
// Close the HTTP server
server.close(() => {
console.log('HTTP server closed');
// Close Socket.IO
io.close(() => {
console.log('Socket.IO server closed');
// Clean up any remaining data
modules.cleanupSocketData();
// Exit after cleanup
process.exit(0);
});
});
};
// Register shutdown handlers
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
};
// Initialize the server
initServer(false);
// Export functions for external use
export { broadcastUpdate, sendTransaction };