Source: tree/treeQueue/treeQueue.js


// @ts-check
/**
 * @import {treeAction} from './../../types.js'
 */

import {query, UUID2hex,HEX2uuid} from '@commtool/sql-query'

import {addUpdateList,updateQueueStatus} from '../../server.ws.js'

import { errorLoggerUpdate } from '../../utils/requestLogger.js'
import { trackCall } from '../../__tests__/.helpers/moduleTracker.js'

/** @type {Record<string, boolean>} */
export const queueRunning={}

/** @type {function(treeAction): Promise<void>} */
let addAction=async ()=>{}
/** @type {function(treeAction): Promise<void>} */
let removeAction=async ()=>{}
/** @type {function(treeAction): Promise<void>} */
let migrateAction=async ()=>{}

/**
 * Triggers the processing of the tree queue for a specific root organization.
 * Processes all queued actions in order and updates the queue status.
 * 
 * @async
 * @function triggerQueue
 * @param {Buffer|string} rootGroup - The UID of the root organization group (Buffer or hex string)
 * @returns {Promise<void>}
 */
export const triggerQueue=async(rootGroup)=>
{
    try {
        // stub automatic queue execution in test mode so that we can test the queue manually
        if(process.env.NODE_ENV==='test')
        {
            trackCall('triggerQueue', HEX2uuid(rootGroup))
            return Promise.resolve()
        }
        // prevent 2nd queue start
        if(queueRunning[HEX2uuid(rootGroup)])
            return
        queueRunning[HEX2uuid(rootGroup)]=true
        const treeSelect=
                `SELECT TreeQueue.qindex ,TreeQueue.UIDroot, TreeQueue.UIDuser,TreeQueue.Type,
                TreeQueue.UIDObjectID , TreeQueue.UIDBelongsTo,
                TreeQueue.UIDoldTarget , TreeQueue.UIDnewTarget, UNIX_TIMESTAMP(TreeQueue.timestamp) AS timestamp
                FROM TreeQueue AS TreeQueue
                WHERE TreeQueue.UIDRoot = ?      
                ORDER BY TreeQueue.qindex`
                
        let queue=await query(treeSelect,[rootGroup])
        updateQueueStatus(HEX2uuid(rootGroup),queue.length)
        /** @type {number|undefined} */
        let  currentIndex
        while(queue.length )
        {        
            for await (const action of queue)
            {
                if(currentIndex!==action.qindex)
                {
                    currentIndex=action.qindex
                    if(action.UIDoldTarget && action.UIDnewTarget && migrateAction )
                    {
                        await migrateAction(action)
                    }
                    else if(action.UIDoldTarget  && action.UIDnewTarget===null && removeAction)
                    {  
                        await removeAction(action)

                    }
                    else if(action.UIDnewTarget &&addAction)
                    { 
                        await addAction(action)
                    
                    }
                    await query("DELETE FROM TreeQueue WHERE TreeQueue.qindex=?", [action.qindex])
                    const [{count}]=await query("SELECT count(*) AS count FROM TreeQueue WHERE  TreeQueue.UIDRoot = ?  ", [rootGroup],{log:false})
                    updateQueueStatus(HEX2uuid(rootGroup),count)
                    // check, if we should update the list
                    if(action.UIDoldTarget)
                        addUpdateList(action.UIDoldTarget)
                    if(action.UIDnewTarget)
                        addUpdateList(action.UIDnewTarget)
                }
            }
            queue=await query(treeSelect,[rootGroup])
            //updateQueueStatus(HEX2uuid(rootGroup),queue.length)
        }
        // mark queue as empty
        queueRunning[HEX2uuid(rootGroup)]=false
        updateQueueStatus(HEX2uuid(rootGroup),0)
    }
    catch(e)
    {
        errorLoggerUpdate(e || new Error('triggerQueue: Unknown error occurred'))
    }


 
    
}


/**
 * Adds a single action to the tree queue and triggers queue processing.
 * 
 * @async
 * @function queueAdd
 * @param {Buffer|string} root - The UID of the root organization (Buffer or hex string)
 * @param {Buffer|string} user - The UID of the user requesting the action  
 * @param {string} type - The type of action to be performed
 * @param {Buffer|string} UID - The UID of the object being acted upon
 * @param {Buffer|string} UIDBelongsTo - The UID of the parent object
 * @param {Buffer|string|null} oldTarget - The UID of the old target object (for migrations/removals)
 * @param {Buffer|string|null} newTarget - The UID of the new target object (for additions/migrations)
 * @param {number|null} [timestamp=null] - Optional timestamp for the action
 * @returns {Promise<void>}
 */
export const queueAdd=async (root,user,type,UID,UIDBelongsTo,oldTarget,newTarget,timestamp=null)=>
{
    try {
        // Validate required parameters
        if (!root || !user) {
            throw new Error(`queueAdd: Invalid parameters - root: ${root ? 'valid' : 'undefined'}, user: ${user ? 'valid' : 'undefined'}`);
        }
        
        await query(`INSERT INTO TreeQueue(UIDRoot,UIDuser,Type,UIDObjectID,UIDBelongsTo,UIDoldTarget,UIDnewTarget,timestamp) 
                VALUES (?,?,?,?,?,?,?,FROM_UNIXTIME(?))`,[root,user,type,UID,UIDBelongsTo,oldTarget,newTarget,timestamp])
        triggerQueue(root)
    }
    catch(e)
    {
        errorLoggerUpdate(e || new Error('queueAdd: Unknown error occurred'))
    }



}


/**
 * Adds multiple actions to the tree queue in batch and triggers queue processing.
 * 
 * @async
 * @function queueAddArray
 * @param {Object} req - Express request object with session data
 * @param {Object} req.session - Session object
 * @param {string} req.session.root - The root organization UUID
 * @param {string} req.session.user - The user UUID
 * @param {Array<{type: string, UID: Buffer|string, UIDBelongsTo: Buffer|string, oldTarget: Buffer|string|null, newTarget: Buffer|string|null, timestamp?: number}>} adds - Array of action objects to add to queue
 * @returns {Promise<void>}
 */
export const queueAddArray=async (req, adds)=>
{
    try {
        const root=UUID2hex(req.session.root)
        const user=UUID2hex(req.session.user)

        if(adds.length>0)
        {
            await query(`INSERT INTO TreeQueue (UIDRoot,UIDuser,Type,UIDObjectID,UIDBelongsTo,UIDoldTarget,UIDnewTarget,timestamp) 
                    VALUES (?,?,?,?,?,?,?,FROM_UNIXTIME(?))`,
                    adds.map(a=>([root,user,a.type,a.UID,a.UIDBelongsTo,a.oldTarget,a.newTarget,a.timestamp])),
                    {batch:true})
                    
            triggerQueue(root)
        }
    }
    catch(e)
    {
        errorLoggerUpdate(e || new Error('queueAddArray: Unknown error occurred'))
    }


}



/**
 * Initializes action handlers and triggers queue processing for all organizations.
 * This function imports the action modules and starts queue processing for all root organizations.
 * 
 * @async
 * @function triggerQueues
 * @returns {Promise<void>}
 */
export const triggerQueues=async()=>
{
    const actionAdd = await import('./treeAdd.js')
    addAction = actionAdd.addAction
    const actionRemove = await import('./treeRemove.js')    
    removeAction =  actionRemove.removeAction
    const actionMigrate = await import('./treeMigrate.js')
    migrateAction = actionMigrate.migrateAction



    try {
        const orgas=await query(`SELECT ObjectBase.UID FROM ObjectBase
            INNER JOIN Links ON (Links.UID=ObjectBase.UID AND Links.UIDTarget=ObjectBase.UID AND Links.Type IN ('memberSys'))  
            WHERE ObjectBase.Type='group' AND hierarchie=1  ;`, [])
        for (const orga of orgas)
        {
            triggerQueue(orga.UID)
        }
    }
    catch(e)
    {
        errorLoggerUpdate(e || new Error('queueTriggerAll: Unknown error occurred'))
    }


}