TasksService Documentation
Table of Contents
- Overview
- Architecture
- Environment Configuration
- API Reference
- Usage in Extensions
- Best Practices
- Troubleshooting
Overview
The TasksService is a built-in service that manages background tasks efficiently by caching "Not started" tasks from the baasix_Tasks table and coordinating task execution across the system. It reduces database calls and provides atomic task claiming, distributed job locking, stall recovery, and configurable concurrency.
Key Features
- Intelligent Caching: Keeps "Not started" tasks in Redis cache to minimize database queries
- Time-Filtered Caching: Only caches tasks scheduled within 4 hours to reduce memory usage
- Automatic Refresh: Periodically refreshes cache based on configurable intervals (max 3 hours)
- Change Detection: Automatically invalidates cache when tasks are created, updated, or deleted
- Atomic Task Claiming:
claimTask()uses atomic DB updates to prevent duplicate processing - Distributed Job Locking:
acquireJobLock()/releaseJobLock()for locking any scheduled job across instances - Concurrency Control: Configurable
TASK_CONCURRENCYfor parallel task processing per instance - Stall Recovery: Automatically recovers tasks stuck in "Running" state with optional retry
- Graceful Shutdown: Waits for running tasks to complete, releases all locks during shutdown
- Extension Integration: Available in extensions via context and direct import
Architecture
baasix_Tasks Table Structure
The TasksService works with the built-in baasix_Tasks table, which includes:
id: Primary key (auto-increment)task_status: ENUM with values:"Not started"- Tasks available for processing"Running"- Currently executing tasks"Completed"- Successfully finished tasks"Error"- Failed tasks
type: String field defining the task typescheduled_time: DateTime field for when the task should be executedstarted_at: DateTime field auto-set when task starts running (used for stall detection)max_retries: Integer field for maximum retry attempts (default: 0)retry_count: Integer field tracking retry attempts (system-generated)attachment_id: Optional file attachment referencetask_data: JSON field for task-specific dataresult_data: JSON field for storing task resultserror_data: JSON field for storing error information- Standard audit fields (userCreated, userUpdated, createdAt, updatedAt)
Service Architecture
Single Instance Mode:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Extensions │───▶│ TasksService │───▶│ Cache Service │
│ │ │ │ │ (Memory/Redis) │
│ - Schedule Exts │ │ - claimTask │ │ │
│ - Hook Exts │ │ - acquireJobLock │ │ - Task Cache │
│ - Custom Logic │ │ - releaseJobLock │ │ - Lock State │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ baasix_Tasks │
│ PostgreSQL │
└─────────────────┘Multi-Instance Mode (with Redis):
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Instance 1 │───▶│ TasksService │───▶│ Task Redis │
│ Instance 2 │ │ │ │ (Distributed) │
│ Instance 3 │ │ - claimTask │ │ │
│ ... │ │ - acquireJobLock │ │ - SETNX Lock │
└─────────────────┘ │ - releaseJobLock │ │ - Auto-Renewal │
│ - stallRecovery │ │ - Job Locks │
└──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ baasix_Tasks │
│ PostgreSQL │
└─────────────────┘When TASK_REDIS_ENABLED=true, distributed locking ensures only one instance processes tasks at a time, and acquireJobLock() prevents the same scheduled job from running on multiple instances simultaneously.
Environment Configuration
Configure TasksService behavior with these environment variables:
Required Variables
# Enable the task service
TASK_SERVICE_ENABLED=trueNote: TasksService uses the system cache which is always initialized with the configured adapter (memory by default, or Redis/Upstash if
SYSTEM_CACHE_ADAPTERis set). TheDATA_CACHE_ENABLEDenvironment variable only affects query caching, not task caching.
Optional Variables
# Task cache refresh interval in seconds (default: 600, maximum: 10800 = 3 hours)
TASK_LIST_REFRESH_INTERVAL=600
# Maximum time to wait for running tasks during shutdown in seconds (default: 30)
TASK_SHUTDOWN_WAIT_TIME=30
# Maximum concurrent tasks per instance (default: 1)
TASK_CONCURRENCY=1
# Seconds before a Running task is considered stalled (default: 300, minimum: 60)
# Stalled tasks are auto-recovered (retried if max_retries > 0, else marked Error)
TASK_STALL_TIMEOUT=300Multi-Instance Configuration (Redis)
For multi-instance deployments (PM2 cluster, Kubernetes, etc.), enable Redis-based distributed locking:
# Enable Redis for distributed task locking
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379Why separate Redis configurations?
| Purpose | Config | Use Case |
|---|---|---|
| Socket.IO Clustering | SOCKET_REDIS_ENABLED + SOCKET_REDIS_URL | WebSocket scaling across instances |
| System Cache | SYSTEM_CACHE_ADAPTER=redis + SYSTEM_CACHE_REDIS_URL | Shared system cache (tasks, permissions, settings) |
| Data Cache | DATA_CACHE_ENABLED=true + DATA_CACHE_ADAPTER=redis | Cache database query results |
| Task Locking | TASK_REDIS_ENABLED + TASK_REDIS_URL | Prevent duplicate task execution |
This separation allows you to:
- Run distributed tasks with in-memory cache (default)
- Share task cache across instances using
SYSTEM_CACHE_ADAPTER=redis - Use different Redis instances for different purposes
- Enable only the Redis features you need
Important:
TASK_REDIS_ENABLEDis for distributed locking only (ensuring one instance processes tasks). For shared task cache across instances, setSYSTEM_CACHE_ADAPTER=redis.
Example .env Configuration
Single Instance (Memory Cache)
TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60
TASK_CONCURRENCY=3
TASK_STALL_TIMEOUT=300Multi-Instance (With Redis for Distributed Locking)
TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60
# Distributed locking for multi-instance (prevents duplicate task execution)
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379With Redis Cache Adapter (Optional)
If you want task caching to use Redis instead of memory (useful for shared cache across instances):
TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
# Use Redis for the system cache adapter (affects all internal caching including tasks)
SYSTEM_CACHE_ADAPTER=redis
SYSTEM_CACHE_REDIS_URL=redis://localhost:6379
# Distributed locking (separate from cache)
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379Important Limitations
- Refresh Interval: Maximum allowed refresh interval is 3 hours (10800 seconds) to prevent missing tasks
- Task Time Window: Only tasks with
scheduled_timewithin 4 hours from current time are cached - Memory Optimization: Time filtering reduces memory usage by excluding far-future tasks
API Reference
Task Claiming
claimTask(taskId)
Atomically claim a task for processing. Uses UPDATE ... WHERE task_status = 'Not started' to prevent duplicate processing across instances. If another worker already claimed the task, returns null.
const claimed = await tasksService.claimTask(taskId);
if (!claimed) {
console.log('Task already claimed by another worker');
return;
}
// Process the claimed task...Parameters:
taskId(string | number): The ID of the task to claim
Returns: Promise<BackgroundTask | null> - The claimed task record, or null if already claimed/not found
Task List
getNotStartedTasks()
Retrieves cached "Not started" tasks from the baasix_Tasks table that are scheduled within 4 hours.
const tasks = await tasksService.getNotStartedTasks();
// Returns: Array of task objects with task_status: "Not started" and scheduled_time within 4 hoursReturns: Promise<BackgroundTask[]> - Array of task objects (filtered by scheduled_time)
Throws: Never throws, returns empty array on error
Note: Only returns tasks scheduled within 4 hours from current time
Distributed Job Locking
acquireJobLock(jobName, ttlSeconds?)
Acquire a named distributed lock for a scheduled job. Prevents the same job from running on multiple instances simultaneously.
- With Redis (
TASK_REDIS_ENABLED=true): usesSET NX EXfor cross-instance locking - Without Redis: uses in-memory set (prevents re-entry within same process)
const locked = await tasksService.acquireJobLock('attendance-cron', 900);
if (!locked) {
console.log('Job already running on another instance');
return;
}
try {
// Process job...
} finally {
await tasksService.releaseJobLock('attendance-cron');
}Parameters:
jobName(string): Unique job identifier (e.g.,"attendance-cron","cleanup-job")ttlSeconds(number, optional): Lock TTL in seconds. Should be >= your job's max execution time. Default: 300
Returns: Promise<boolean> - true if lock acquired, false if already held
releaseJobLock(jobName)
Release a named job lock. Only releases if this instance owns the lock (atomic via Lua script in Redis mode).
await tasksService.releaseJobLock('attendance-cron');Parameters:
jobName(string): The job name used inacquireJobLock()
Returns: Promise<boolean> - true if released, false if not owned or error
Instance Lock (Task Processing)
tryAcquireLock(lockTimeout?)
Acquire a task processing slot. Respects TASK_CONCURRENCY — allows up to N concurrent tasks per instance. In multi-instance mode, only one instance can hold the processing lock at a time.
const acquired = await tasksService.tryAcquireLock();
if (!acquired) return;
try {
// Process tasks...
} finally {
await tasksService.releaseLock();
}Parameters:
lockTimeout(number, optional): Redis lock TTL in seconds (default: 60)
Returns: Promise<boolean> - true if slot acquired, false if at capacity or lock held by another instance
releaseLock()
Release a task processing slot. When all slots are released, the instance lock (Redis) is also released.
Returns: Promise<boolean> - true if released
Legacy Methods (Deprecated)
setTaskRunning(isRunning) (deprecated)
Use tryAcquireLock() / releaseLock() instead. Still functional — delegates to the new methods internally.
// ❌ Deprecated
await tasksService.setTaskRunning(true);
// ✅ Preferred
const acquired = await tasksService.tryAcquireLock();isTaskRunning() (deprecated)
Returns true when running task count >= configured TASK_CONCURRENCY. Still functional.
Utility Methods
forceRefresh()
Manually triggers a cache refresh, useful for testing or immediate updates.
await tasksService.forceRefresh();Returns: Promise<void>
getCacheStats()
Returns statistics about the cache and service status.
const stats = await tasksService.getCacheStats();
// {
// cachedTasksCount: 5,
// runningCount: 0,
// concurrency: 3,
// isAtCapacity: false,
// stallTimeout: 300,
// refreshInterval: 600000,
// initialized: true,
// distributedMode: true,
// hasInstanceLock: false,
// instanceId: "a1b2c3d4"
// }Returns: Promise<Object> - Cache statistics object
recoverStalledTasks()
Recover tasks stuck in "Running" state beyond the stall timeout. Called automatically during initialization and each periodic cache refresh.
- If
retry_count < max_retries: resets to "Not started" for automatic retry - Otherwise: marks as "Error" with stall information
Usage in Extensions
Scenario 1: Processing Background Tasks with Atomic Claiming
Use claimTask() for safe per-task processing that works across multiple instances:
// extensions/baasix-schedule-task-processor/index.js
import schedule from 'node-schedule';
export default async function (hooksManager, context) {
const { tasksService, ItemsService } = context;
// Run every minute
schedule.scheduleJob('* * * * *', async function () {
const tasks = await tasksService.getNotStartedTasks();
const taskService = new ItemsService('baasix_Tasks');
for (const task of tasks) {
// Atomic claim — returns null if already taken by another worker
const claimed = await tasksService.claimTask(task.id);
if (!claimed) continue;
try {
if (claimed.type === 'email_batch') {
await processBatchEmails(claimed);
} else if (claimed.type === 'data_export') {
await processDataExport(claimed);
}
await taskService.updateOne(task.id, {
task_status: 'Completed',
result_data: { processed_at: new Date().toISOString() },
});
} catch (error) {
await taskService.updateOne(task.id, {
task_status: 'Error',
error_data: { message: error.message },
});
}
}
});
}Scenario 2: Locking a Recurring Scheduled Job
Use acquireJobLock() / releaseJobLock() for cron jobs that are NOT task-table based but need single-instance execution:
// extensions/baasix-schedule-attendance/index.js
import schedule from 'node-schedule';
import moment from 'moment';
export default async function (hooksManager, context) {
const { tasksService } = context;
// Every 15 minutes: mark absent for ended classes
schedule.scheduleJob('*/15 * * * *', async function () {
// Acquire a named lock — only one instance runs this job
const locked = await tasksService.acquireJobLock('attendance-cron', 900);
if (!locked) return; // another instance is running this job
try {
const checkStart = moment().subtract(1, 'hours').toDate().toISOString();
const checkEnd = moment().subtract(30, 'minutes').toDate().toISOString();
await AttendanceUtils.ProcessScheduleAttendance({
endtime: { between: [checkStart, checkEnd] },
attendance_enabled: { ne: false },
});
await AttendanceUtils.ScheduleAttendanceCalculation();
await AttendanceUtils.CalculateConsecutiveDaysAttendance();
} catch (error) {
console.error('Attendance cron error:', error);
} finally {
await tasksService.releaseJobLock('attendance-cron');
}
});
}Scenario 3: Using Instance Lock for Sequential Processing
Use tryAcquireLock() / releaseLock() when you want the legacy single-lock gate pattern:
// extensions/baasix-schedule-task-processor/index.js
export default async function (hooksManager, context) {
const { tasksService, ItemsService } = context;
schedule.scheduleJob('* * * * *', async function () {
// Only one cron tick runs at a time per instance
const acquired = await tasksService.tryAcquireLock(60);
if (!acquired) return;
try {
const tasks = await tasksService.getNotStartedTasks();
const taskService = new ItemsService('baasix_Tasks');
for (const task of tasks) {
await taskService.updateOne(task.id, { task_status: 'Running' });
// Process task...
await taskService.updateOne(task.id, { task_status: 'Completed' });
}
} finally {
await tasksService.releaseLock();
}
});
}Hook Extension Example
Create tasks based on events:
// extensions/baasix-hook-task-creator/index.js
export default (hooksManager, context) => {
const { tasksService, ItemsService } = context;
hooksManager.registerHook('data_imports', 'items.create.after', async ({ data }) => {
if (data.status === 'completed' && data.record_count > 1000) {
const tasksItemsService = new ItemsService('baasix_Tasks', {
accountability: { bypassPermissions: true },
});
await tasksItemsService.createOne({
type: 'cleanup_temp_files',
task_status: 'Not started',
scheduled_time: new Date(),
max_retries: 3, // Auto-retry up to 3 times if stalled
task_data: JSON.stringify({ import_id: data.id }),
});
// Force refresh cache for immediate availability
await tasksService.forceRefresh();
}
return data;
});
};Best Practices
1. Use claimTask() for Task Processing
// ✅ Best - Atomic per-task claim, safe across multiple instances
for (const task of tasks) {
const claimed = await tasksService.claimTask(task.id);
if (!claimed) continue;
// Process task...
}
// ⚠️ Legacy - Check-then-act pattern has a race window
if (await tasksService.isTaskRunning()) return;
await tasksService.setTaskRunning(true);2. Use acquireJobLock() for Non-Task Cron Jobs
// ✅ Best - Named lock, works across instances
const locked = await tasksService.acquireJobLock('my-cron-job', 600);
if (!locked) return;
try {
/* ... */
} finally {
await tasksService.releaseJobLock('my-cron-job');
}
// ❌ Bad - No protection against multi-instance execution
schedule.scheduleJob('*/5 * * * *', async () => {
await doWork(); // Runs on every instance!
});3. Set TTL >= Max Execution Time
// ✅ Good - 15min cron, lock TTL = 15min (900s)
await tasksService.acquireJobLock('cleanup', 900);
// ❌ Bad - If job takes 10min, lock expires at 5min, another instance starts
await tasksService.acquireJobLock('cleanup', 300);4. Configure Stall Recovery with Retries
// When creating tasks, set max_retries for auto-recovery from stalls
await tasksItemsService.createOne({
type: 'email_batch',
task_status: 'Not started',
scheduled_time: new Date(),
max_retries: 3, // Auto-retry up to 3 times if stalled (based on TASK_STALL_TIMEOUT)
});5. Use Appropriate Refresh Intervals
# ✅ Good - For high-frequency task creation
TASK_LIST_REFRESH_INTERVAL=60 # 1 minute
# ✅ Good - For normal usage
TASK_LIST_REFRESH_INTERVAL=1800 # 30 minutes
# ⚠️ Maximum enforced - Longest allowed interval
TASK_LIST_REFRESH_INTERVAL=10800 # 3 hours (system maximum)Troubleshooting
Common Issues
1. Tasks Not Appearing in Cache
Symptoms: getNotStartedTasks() returns empty array despite having "Not started" tasks in database.
Solutions:
// Check service initialization
const stats = await tasksService.getCacheStats();
console.log('Service initialized:', stats.initialized);
// Force refresh cache
await tasksService.forceRefresh();
// Verify: is the task scheduled within 4 hours?
// Only tasks with scheduled_time <= now + 4 hours are cached2. Tasks Stuck in "Running" State
Symptoms: Tasks remain in "Running" status and are not processed.
Solutions:
The stall recovery system handles this automatically:
TASK_STALL_TIMEOUT(default: 300s) defines how long before a Running task is considered stalled- Stalled tasks with
max_retries > retry_countare reset to "Not started" for retry - Stalled tasks with no retries left are marked as "Error"
Recovery runs during initialization and each periodic cache refresh.
# Reduce stall timeout for faster recovery
TASK_STALL_TIMEOUT=1203. Same Job Running on Multiple Instances
Symptoms: Cron jobs execute on every instance in a multi-instance deployment.
Solutions:
// Use acquireJobLock for any cron job that should run on only one instance
const locked = await tasksService.acquireJobLock('my-job', 600);
if (!locked) return;
try {
/* ... */
} finally {
await tasksService.releaseJobLock('my-job');
}Requires TASK_REDIS_ENABLED=true for cross-instance coordination.
4. Duplicate Task Processing
Symptoms: The same task is processed by multiple workers.
Solutions:
// Use claimTask() instead of manual status updates
const claimed = await tasksService.claimTask(task.id);
if (!claimed) continue; // Already claimed by another worker
// claimTask() uses atomic UPDATE ... WHERE task_status = 'Not started'
// Only one worker wins the claimDebugging Tips
Enable Debug Logging
LOG_LEVEL=debugMonitor Cache Stats
setInterval(async () => {
const stats = await tasksService.getCacheStats();
console.log('TasksService Stats:', stats);
}, 30000);Related Documentation
- Baasix Extensions - Creating custom extensions
- Plugins - Plugin system with full service access
- Hooks System - Understanding the hooks system
- Additional Features - Cache management
- Settings Reference - All environment configuration