Source: Router/maintenance/compressHistory.js

// @ts-check

/**
 * compressHistory.js — History compression for system-versioned tables
 *
 * Adapted from compress_v2.mjs for the members-back maintenance API.
 *
 * Approach:
 *   1. CREATE TABLE {table}_new LIKE {table}
 *   2. Chunk-wise SELECT FOR SYSTEM_TIME ALL, group by entity keys,
 *      squeeze consecutive duplicate history rows
 *   3. INSERT IGNORE with proper history handling
 *   4. Skip invalid ranges (ValidFrom >= ValidUntil)
 *   5. RENAME TABLE swap (one statement, at end — only with ?swap=true)
 *
 * Query params:
 *   ?apply=true   — actually create _new tables and compress (default: dry-run)
 *   ?swap=true    — also perform the RENAME TABLE swap (requires apply=true)
 *   ?chunkSize=N  — rows per chunk (default: 50000)
 */

import { getConnection } from '@commtool/sql-query';
import crypto from 'crypto';
import { errorLoggerUpdate } from '../../utils/requestLogger.js';
import { sendProgressStatus } from '../../server.ws.js';

const INFINITY_MS = new Date('2106-02-07 06:28:15.999999').getTime();
const DEFAULT_CHUNK_SIZE = 50000;

/** Tables to compress with their entity keys (columns that define a unique entity) */
const TABLES = [
    { name: 'ObjectBase', entityKeys: ['UID'] },
    { name: 'Links', entityKeys: ['UID', 'Type', 'UIDTarget'] },
];

/**
 * Compute a hash of the non-key, non-temporal columns for a row.
 * Two rows with the same hash can be collapsed (the earlier history row removed).
 *
 * @param {Object} row
 * @param {Set<string>} entityKeySet
 * @param {Array<{Field: string, isVirtual: boolean}>} allColumns
 * @returns {string}
 */
function getHash(row, entityKeySet, allColumns) {
    const exclude = new Set([...entityKeySet, 'ValidFrom', 'ValidUntil']);
    const parts = allColumns
        .filter(c => !exclude.has(c.Field) && !c.isVirtual)
        .map(c => {
            const v = row[c.Field];
            if (v === null || v === undefined) return '';
            if (v instanceof Buffer) return v.toString('hex');
            if (v instanceof Date) return v.toISOString();
            if (typeof v === 'object') return JSON.stringify(v);
            return String(v);
        });
    return crypto.createHash('md5').update(parts.join('|')).digest('hex');
}

/**
 * Squeeze consecutive duplicate history rows for a single entity.
 * Keeps the first occurrence, then only rows whose hash differs from the previous,
 * plus the current (latest) row.
 *
 * @param {Array<Object>} rows - All history rows for one entity, sorted by ValidFrom
 * @param {Set<string>} entityKeySet
 * @param {Array<{Field: string, isVirtual: boolean}>} allColumns
 * @returns {Array<Object>}
 */
function squeeze(rows, entityKeySet, allColumns) {
    if (rows.length <= 1) return rows;
    const result = [];
    let prevHash = null;
    for (let i = 0; i < rows.length; i++) {
        const row = rows[i];
        const vu = row.ValidUntil instanceof Date ? row.ValidUntil.getTime() : 0;
        const isCurrent = vu >= INFINITY_MS - 1000;
        const hash = getHash(row, entityKeySet, allColumns);
        if (i === 0 || isCurrent || hash !== prevHash) {
            result.push(row);
            prevHash = hash;
        }
    }
    return result;
}

/**
 * Determine which columns are VIRTUAL / GENERATED (excluded from INSERT).
 *
 * @param {Array<{Field: string, Extra: string}>} columns - Result of SHOW COLUMNS
 * @returns {Array<{Field: string, isVirtual: boolean}>}
 */
function getNonVirtualColumns(columns) {
    return columns.map(c => ({
        Field: c.Field,
        isVirtual: c.Extra && (c.Extra.includes('VIRTUAL') || c.Extra.includes('GENERATED'))
    }));
}

/**
 * Compress a single system-versioned table.
 *
 * @param {string} tableName
 * @param {string[]} entityKeys
 * @param {number} chunkSize
 * @param {boolean} dryRun
 * @param {import('mariadb').Connection} connection - Raw mariadb connection
 * @param {(status: Object) => void} onProgress
 * @returns {Promise<{squeezed: number, inserted: number, originalTotal: number, newTotal: number}>}
 */
async function compressTable(tableName, entityKeys, chunkSize, dryRun, connection, onProgress) {
    const entityKeySet = new Set(entityKeys);

    const rawColumns = await connection.query(`SHOW COLUMNS FROM \`${tableName}\``);
    const allColumns = getNonVirtualColumns(rawColumns);
    const insertColumns = allColumns.filter(c => !c.isVirtual);
    const insertColNames = insertColumns.map(c => c.Field);

    onProgress({
        action: 'progress',
        text: `Processing ${tableName}…`,
        progress: 0
    });

    const [[{ cnt: curr }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}\``);
    const [[{ cnt: hist }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}\` FOR SYSTEM_TIME ALL`);
    onProgress({
        action: 'progress',
        text: `${tableName}: ${curr} current rows, ${hist} total (${hist - curr} history rows)`,
        progress: 5
    });

    if (!dryRun) {
        await connection.query(`DROP TABLE IF EXISTS \`${tableName}_new\``);
        await connection.query(`CREATE TABLE \`${tableName}_new\` LIKE \`${tableName}\``);
        await connection.query('SET SESSION system_versioning_insert_history = ON');
        onProgress({
            action: 'progress',
            text: `Created ${tableName}_new, enabled system_versioning_insert_history`,
            progress: 10
        });
    }

    let processed = 0;
    let totalRowsProcessed = 0;
    let totalSqueezed = 0;
    let totalInserted = 0;

    while (true) {
        const rows = await connection.query(
            `SELECT * FROM \`${tableName}\` FOR SYSTEM_TIME ALL ORDER BY UID, ValidFrom LIMIT ${chunkSize} OFFSET ${processed}`
        );
        if (rows.length === 0) break;

        const groups = new Map();
        for (const row of rows) {
            const key = entityKeys.map(k => {
                const v = row[k];
                return v instanceof Buffer ? v.toString('hex') : String(v);
            }).join(':');
            if (!groups.has(key)) groups.set(key, []);
            groups.get(key).push(row);
        }

        for (const [, groupRows] of groups) {
            const squeezed = squeeze(groupRows, entityKeySet, allColumns);
            totalSqueezed += groupRows.length - squeezed.length;

            if (dryRun) continue;

            for (const row of squeezed) {
                const vu = row.ValidUntil instanceof Date ? row.ValidUntil.getTime() : 0;
                const isCurrent = vu >= INFINITY_MS - 1000;

                const dataCols = insertColNames.filter(c => c !== 'ValidFrom' && c !== 'ValidUntil');
                const dataVals = dataCols.map(c => row[c]);

                if (!isCurrent) {
                    const vf = row.ValidFrom instanceof Date ? row.ValidFrom : new Date(row.ValidFrom);
                    const vuDate = row.ValidUntil instanceof Date ? row.ValidUntil : new Date(row.ValidUntil);
                    if (vf.getTime() >= vuDate.getTime()) {
                        totalSqueezed++;
                        continue;
                    }
                }

                if (isCurrent) {
                    const vfTs = Math.floor(
                        (row.ValidFrom instanceof Date ? row.ValidFrom.getTime() : new Date(row.ValidFrom).getTime()) / 1000
                    );
                    await connection.query('SET SESSION system_versioning_insert_history = OFF');
                    await connection.query(`SET @@timestamp = ${vfTs}`);
                    await connection.query(
                        `INSERT IGNORE INTO \`${tableName}_new\` (${dataCols.join(', ')}) VALUES (${dataCols.map(() => '?').join(', ')})`,
                        dataVals
                    );
                    await connection.query('SET @@timestamp = DEFAULT');
                    await connection.query('SET SESSION system_versioning_insert_history = ON');
                } else {
                    const vf = row.ValidFrom instanceof Date ? row.ValidFrom : new Date(row.ValidFrom);
                    const vuDate = row.ValidUntil instanceof Date ? row.ValidUntil : new Date(row.ValidUntil);
                    const allCols = [...dataCols, 'ValidFrom', 'ValidUntil'];
                    const allVals = [...dataVals, vf, vuDate];
                    await connection.query(
                        `INSERT IGNORE INTO \`${tableName}_new\` (${allCols.join(', ')}) VALUES (${allCols.map(() => '?').join(', ')})`,
                        allVals
                    );
                }
                totalInserted++;
            }
        }

        totalRowsProcessed += rows.length;
        processed += chunkSize;

        onProgress({
            action: 'progress',
            text: `${tableName}: processed ${totalRowsProcessed} rows, squeezed: ${totalSqueezed}, inserted: ${totalInserted}`,
            current: totalRowsProcessed,
            total: Math.max(hist, 1),
            progress: Math.min(10 + Math.round((totalRowsProcessed / Math.max(hist, 1)) * 80), 90)
        });
    }

    let newCurr = 0, newHist = 0;
    if (!dryRun && totalInserted > 0) {
        const [[{ cnt: nc }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\``);
        const [[{ cnt: nh }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\` FOR SYSTEM_TIME ALL`);
        newCurr = nc;
        newHist = nh;
    }

    onProgress({
        action: 'progress',
        text: `${tableName}: squeezed ${totalSqueezed} rows, inserted ${totalInserted}` +
            (newHist > 0 ? ` (${hist} → ${newHist}, reduction: ${hist - newHist})` : ''),
        progress: 95
    });

    return { squeezed: totalSqueezed, inserted: totalInserted, originalTotal: hist, newTotal: newHist };
}

/**
 * Compress history for all configured system-versioned tables.
 *
 * GET /maintenance/compressHistory
 *
 * Query params:
 *   ?apply=true   — actually create _new tables and compress (default: dry-run, no writes)
 *   ?swap=true    — also perform the RENAME TABLE swap (requires apply=true)
 *   ?chunkSize=N  — rows per chunk (default: 50000)
 *
 * @param {import('../../types.js').ExpressRequestAuthorized} req
 * @param {import('../../types.js').ExpressResponse} res
 */
export const compressHistory = async (req, res) => {
    const dryRun = req.query.apply !== 'true';
    const doSwap = req.query.swap === 'true';
    const chunkSize = parseInt(req.query.chunkSize || String(DEFAULT_CHUNK_SIZE), 10);

    sendProgressStatus(req.session.root, {
        action: 'start',
        text: `Starting history compression${dryRun ? ' (DRY RUN)' : ''}…`,
        progress: 0
    });

    let connection;
    try {
        connection = await getConnection();

        const results = [];

        for (const { name: tableName, entityKeys } of TABLES) {
            const result = await compressTable(
                tableName, entityKeys, chunkSize, dryRun, connection,
                (status) => sendProgressStatus(req.session.root, status)
            );
            results.push({ table: tableName, ...result });
        }

        if (!dryRun && doSwap) {
            sendProgressStatus(req.session.root, {
                action: 'progress',
                text: 'Performing RENAME TABLE swap…',
                progress: 95
            });

            for (const { name: tableName } of TABLES) {
                const [[{ cnt }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\``);
                if (cnt === 0) {
                    throw new Error(`Cannot swap: ${tableName}_new is empty`);
                }
                await connection.query(`DROP TABLE IF EXISTS \`${tableName}_oldbak\``);
                await connection.query(
                    `RENAME TABLE \`${tableName}\` TO \`${tableName}_oldbak\`, \`${tableName}_new\` TO \`${tableName}\``
                );
            }

            sendProgressStatus(req.session.root, {
                action: 'progress',
                text: 'RENAME TABLE swap completed',
                progress: 98
            });
        }

        for (const r of results) {
            const reduction = r.originalTotal - r.newTotal;
            sendProgressStatus(req.session.root, {
                action: 'progress',
                text: `${r.table}: ${r.squeezed} history rows compressed, inserted ${r.inserted}` +
                    (r.newTotal > 0 ? ` (reduction: ${reduction} rows: ${r.originalTotal} → ${r.newTotal})` : ''),
                progress: 99
            });
        }

        sendProgressStatus(req.session.root, {
            action: 'finished',
            text: `History compression completed${dryRun ? ' (DRY RUN — no changes made)' : doSwap ? ' (swap performed)' : ' (_new tables created)'}`,
            progress: 100
        });

        res.json({
            success: true,
            dryRun,
            swapPerformed: !dryRun && doSwap,
            results
        });

    } catch (e) {
        sendProgressStatus(req.session.root, {
            action: 'error',
            text: `Error: ${e.message}`
        });
        errorLoggerUpdate(e);
        res.status(500).json({ success: false, error: e.message });
    } finally {
        if (connection) {
            try { connection.release(); } catch (_) { /* ignore */ }
        }
    }
};