// @ts-check
/**
* compressHistory.js — History compression for system-versioned tables
*
* Adapted from compress_v2.mjs for the members-back maintenance API.
*
* Approach:
* 1. CREATE TABLE {table}_new LIKE {table}
* 2. Chunk-wise SELECT FOR SYSTEM_TIME ALL, group by entity keys,
* squeeze consecutive duplicate history rows
* 3. INSERT IGNORE with proper history handling
* 4. Skip invalid ranges (ValidFrom >= ValidUntil)
* 5. RENAME TABLE swap (one statement, at end — only with ?swap=true)
*
* Query params:
* ?apply=true — actually create _new tables and compress (default: dry-run)
* ?swap=true — also perform the RENAME TABLE swap (requires apply=true)
* ?chunkSize=N — rows per chunk (default: 50000)
*/
import { getConnection } from '@commtool/sql-query';
import crypto from 'crypto';
import { errorLoggerUpdate } from '../../utils/requestLogger.js';
import { sendProgressStatus } from '../../server.ws.js';
const INFINITY_MS = new Date('2106-02-07 06:28:15.999999').getTime();
const DEFAULT_CHUNK_SIZE = 50000;
/** Tables to compress with their entity keys (columns that define a unique entity) */
const TABLES = [
{ name: 'ObjectBase', entityKeys: ['UID'] },
{ name: 'Links', entityKeys: ['UID', 'Type', 'UIDTarget'] },
];
/**
* Compute a hash of the non-key, non-temporal columns for a row.
* Two rows with the same hash can be collapsed (the earlier history row removed).
*
* @param {Object} row
* @param {Set<string>} entityKeySet
* @param {Array<{Field: string, isVirtual: boolean}>} allColumns
* @returns {string}
*/
function getHash(row, entityKeySet, allColumns) {
const exclude = new Set([...entityKeySet, 'ValidFrom', 'ValidUntil']);
const parts = allColumns
.filter(c => !exclude.has(c.Field) && !c.isVirtual)
.map(c => {
const v = row[c.Field];
if (v === null || v === undefined) return '';
if (v instanceof Buffer) return v.toString('hex');
if (v instanceof Date) return v.toISOString();
if (typeof v === 'object') return JSON.stringify(v);
return String(v);
});
return crypto.createHash('md5').update(parts.join('|')).digest('hex');
}
/**
* Squeeze consecutive duplicate history rows for a single entity.
* Keeps the first occurrence, then only rows whose hash differs from the previous,
* plus the current (latest) row.
*
* @param {Array<Object>} rows - All history rows for one entity, sorted by ValidFrom
* @param {Set<string>} entityKeySet
* @param {Array<{Field: string, isVirtual: boolean}>} allColumns
* @returns {Array<Object>}
*/
function squeeze(rows, entityKeySet, allColumns) {
if (rows.length <= 1) return rows;
const result = [];
let prevHash = null;
for (let i = 0; i < rows.length; i++) {
const row = rows[i];
const vu = row.ValidUntil instanceof Date ? row.ValidUntil.getTime() : 0;
const isCurrent = vu >= INFINITY_MS - 1000;
const hash = getHash(row, entityKeySet, allColumns);
if (i === 0 || isCurrent || hash !== prevHash) {
result.push(row);
prevHash = hash;
}
}
return result;
}
/**
* Determine which columns are VIRTUAL / GENERATED (excluded from INSERT).
*
* @param {Array<{Field: string, Extra: string}>} columns - Result of SHOW COLUMNS
* @returns {Array<{Field: string, isVirtual: boolean}>}
*/
function getNonVirtualColumns(columns) {
return columns.map(c => ({
Field: c.Field,
isVirtual: c.Extra && (c.Extra.includes('VIRTUAL') || c.Extra.includes('GENERATED'))
}));
}
/**
* Compress a single system-versioned table.
*
* @param {string} tableName
* @param {string[]} entityKeys
* @param {number} chunkSize
* @param {boolean} dryRun
* @param {import('mariadb').Connection} connection - Raw mariadb connection
* @param {(status: Object) => void} onProgress
* @returns {Promise<{squeezed: number, inserted: number, originalTotal: number, newTotal: number}>}
*/
async function compressTable(tableName, entityKeys, chunkSize, dryRun, connection, onProgress) {
const entityKeySet = new Set(entityKeys);
const rawColumns = await connection.query(`SHOW COLUMNS FROM \`${tableName}\``);
const allColumns = getNonVirtualColumns(rawColumns);
const insertColumns = allColumns.filter(c => !c.isVirtual);
const insertColNames = insertColumns.map(c => c.Field);
onProgress({
action: 'progress',
text: `Processing ${tableName}…`,
progress: 0
});
const [[{ cnt: curr }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}\``);
const [[{ cnt: hist }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}\` FOR SYSTEM_TIME ALL`);
onProgress({
action: 'progress',
text: `${tableName}: ${curr} current rows, ${hist} total (${hist - curr} history rows)`,
progress: 5
});
if (!dryRun) {
await connection.query(`DROP TABLE IF EXISTS \`${tableName}_new\``);
await connection.query(`CREATE TABLE \`${tableName}_new\` LIKE \`${tableName}\``);
await connection.query('SET SESSION system_versioning_insert_history = ON');
onProgress({
action: 'progress',
text: `Created ${tableName}_new, enabled system_versioning_insert_history`,
progress: 10
});
}
let processed = 0;
let totalRowsProcessed = 0;
let totalSqueezed = 0;
let totalInserted = 0;
while (true) {
const rows = await connection.query(
`SELECT * FROM \`${tableName}\` FOR SYSTEM_TIME ALL ORDER BY UID, ValidFrom LIMIT ${chunkSize} OFFSET ${processed}`
);
if (rows.length === 0) break;
const groups = new Map();
for (const row of rows) {
const key = entityKeys.map(k => {
const v = row[k];
return v instanceof Buffer ? v.toString('hex') : String(v);
}).join(':');
if (!groups.has(key)) groups.set(key, []);
groups.get(key).push(row);
}
for (const [, groupRows] of groups) {
const squeezed = squeeze(groupRows, entityKeySet, allColumns);
totalSqueezed += groupRows.length - squeezed.length;
if (dryRun) continue;
for (const row of squeezed) {
const vu = row.ValidUntil instanceof Date ? row.ValidUntil.getTime() : 0;
const isCurrent = vu >= INFINITY_MS - 1000;
const dataCols = insertColNames.filter(c => c !== 'ValidFrom' && c !== 'ValidUntil');
const dataVals = dataCols.map(c => row[c]);
if (!isCurrent) {
const vf = row.ValidFrom instanceof Date ? row.ValidFrom : new Date(row.ValidFrom);
const vuDate = row.ValidUntil instanceof Date ? row.ValidUntil : new Date(row.ValidUntil);
if (vf.getTime() >= vuDate.getTime()) {
totalSqueezed++;
continue;
}
}
if (isCurrent) {
const vfTs = Math.floor(
(row.ValidFrom instanceof Date ? row.ValidFrom.getTime() : new Date(row.ValidFrom).getTime()) / 1000
);
await connection.query('SET SESSION system_versioning_insert_history = OFF');
await connection.query(`SET @@timestamp = ${vfTs}`);
await connection.query(
`INSERT IGNORE INTO \`${tableName}_new\` (${dataCols.join(', ')}) VALUES (${dataCols.map(() => '?').join(', ')})`,
dataVals
);
await connection.query('SET @@timestamp = DEFAULT');
await connection.query('SET SESSION system_versioning_insert_history = ON');
} else {
const vf = row.ValidFrom instanceof Date ? row.ValidFrom : new Date(row.ValidFrom);
const vuDate = row.ValidUntil instanceof Date ? row.ValidUntil : new Date(row.ValidUntil);
const allCols = [...dataCols, 'ValidFrom', 'ValidUntil'];
const allVals = [...dataVals, vf, vuDate];
await connection.query(
`INSERT IGNORE INTO \`${tableName}_new\` (${allCols.join(', ')}) VALUES (${allCols.map(() => '?').join(', ')})`,
allVals
);
}
totalInserted++;
}
}
totalRowsProcessed += rows.length;
processed += chunkSize;
onProgress({
action: 'progress',
text: `${tableName}: processed ${totalRowsProcessed} rows, squeezed: ${totalSqueezed}, inserted: ${totalInserted}`,
current: totalRowsProcessed,
total: Math.max(hist, 1),
progress: Math.min(10 + Math.round((totalRowsProcessed / Math.max(hist, 1)) * 80), 90)
});
}
let newCurr = 0, newHist = 0;
if (!dryRun && totalInserted > 0) {
const [[{ cnt: nc }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\``);
const [[{ cnt: nh }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\` FOR SYSTEM_TIME ALL`);
newCurr = nc;
newHist = nh;
}
onProgress({
action: 'progress',
text: `${tableName}: squeezed ${totalSqueezed} rows, inserted ${totalInserted}` +
(newHist > 0 ? ` (${hist} → ${newHist}, reduction: ${hist - newHist})` : ''),
progress: 95
});
return { squeezed: totalSqueezed, inserted: totalInserted, originalTotal: hist, newTotal: newHist };
}
/**
* Compress history for all configured system-versioned tables.
*
* GET /maintenance/compressHistory
*
* Query params:
* ?apply=true — actually create _new tables and compress (default: dry-run, no writes)
* ?swap=true — also perform the RENAME TABLE swap (requires apply=true)
* ?chunkSize=N — rows per chunk (default: 50000)
*
* @param {import('../../types.js').ExpressRequestAuthorized} req
* @param {import('../../types.js').ExpressResponse} res
*/
export const compressHistory = async (req, res) => {
const dryRun = req.query.apply !== 'true';
const doSwap = req.query.swap === 'true';
const chunkSize = parseInt(req.query.chunkSize || String(DEFAULT_CHUNK_SIZE), 10);
sendProgressStatus(req.session.root, {
action: 'start',
text: `Starting history compression${dryRun ? ' (DRY RUN)' : ''}…`,
progress: 0
});
let connection;
try {
connection = await getConnection();
const results = [];
for (const { name: tableName, entityKeys } of TABLES) {
const result = await compressTable(
tableName, entityKeys, chunkSize, dryRun, connection,
(status) => sendProgressStatus(req.session.root, status)
);
results.push({ table: tableName, ...result });
}
if (!dryRun && doSwap) {
sendProgressStatus(req.session.root, {
action: 'progress',
text: 'Performing RENAME TABLE swap…',
progress: 95
});
for (const { name: tableName } of TABLES) {
const [[{ cnt }]] = await connection.query(`SELECT COUNT(*) AS cnt FROM \`${tableName}_new\``);
if (cnt === 0) {
throw new Error(`Cannot swap: ${tableName}_new is empty`);
}
await connection.query(`DROP TABLE IF EXISTS \`${tableName}_oldbak\``);
await connection.query(
`RENAME TABLE \`${tableName}\` TO \`${tableName}_oldbak\`, \`${tableName}_new\` TO \`${tableName}\``
);
}
sendProgressStatus(req.session.root, {
action: 'progress',
text: 'RENAME TABLE swap completed',
progress: 98
});
}
for (const r of results) {
const reduction = r.originalTotal - r.newTotal;
sendProgressStatus(req.session.root, {
action: 'progress',
text: `${r.table}: ${r.squeezed} history rows compressed, inserted ${r.inserted}` +
(r.newTotal > 0 ? ` (reduction: ${reduction} rows: ${r.originalTotal} → ${r.newTotal})` : ''),
progress: 99
});
}
sendProgressStatus(req.session.root, {
action: 'finished',
text: `History compression completed${dryRun ? ' (DRY RUN — no changes made)' : doSwap ? ' (swap performed)' : ' (_new tables created)'}`,
progress: 100
});
res.json({
success: true,
dryRun,
swapPerformed: !dryRun && doSwap,
results
});
} catch (e) {
sendProgressStatus(req.session.root, {
action: 'error',
text: `Error: ${e.message}`
});
errorLoggerUpdate(e);
res.status(500).json({ success: false, error: e.message });
} finally {
if (connection) {
try { connection.release(); } catch (_) { /* ignore */ }
}
}
};