TasksService Documentation
Table of Contents
- Overview
- Architecture
- Environment Configuration
- API Reference
- Usage in Extensions
- Best Practices
- Troubleshooting
Overview
The TasksService is a built-in service that manages background tasks efficiently by caching "Not started" tasks from the baasix_Tasks table and coordinating task execution across the system. It reduces database calls and ensures only one task runs at a time.
Key Features
- Intelligent Caching: Keeps "Not started" tasks in Redis cache to minimize database queries
- Time-Filtered Caching: Only caches tasks scheduled within 4 hours to reduce memory usage
- Automatic Refresh: Periodically refreshes cache based on configurable intervals (max 3 hours)
- Change Detection: Automatically invalidates cache when tasks are created, updated, or deleted
- Task Coordination: Provides global state management to prevent concurrent task execution
- Graceful Shutdown: Waits for running tasks to complete during server shutdown
- Extension Integration: Easily integrates with custom extensions for task processing
Architecture
baasix_Tasks Table Structure
The TasksService works with the built-in baasix_Tasks table, which includes:
id: Primary key (auto-increment)task_status: ENUM with values:"Not started"- Tasks available for processing"Running"- Currently executing tasks"Completed"- Successfully finished tasks"Error"- Failed tasks
type: String field defining the task typescheduled_time: DateTime field for when the task should be executedattachment_id: Optional file attachment referencetask_data: JSON field for task-specific dataresult_data: JSON field for storing task resultserror_data: JSON field for storing error information- Standard audit fields (userCreated, userUpdated, createdAt, updatedAt)
Service Architecture
Single Instance Mode:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Extensions │───▶│ TasksService │───▶│ Cache Service │
│ │ │ │ │ (Memory/Redis) │
│ - Schedule Exts │ │ - getNotStarted │ │ │
│ - Hook Exts │ │ - tryAcquireLock │ │ - Task Cache │
│ - Custom Logic │ │ - releaseLock │ │ - Lock State │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ baasix_Tasks │
│ PostgreSQL │
└─────────────────┘Multi-Instance Mode (with Redis):
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Instance 1 │───▶│ TasksService │───▶│ Task Redis │
│ Instance 2 │ │ │ │ (Distributed) │
│ Instance 3 │ │ - tryAcquireLock │ │ │
│ ... │ │ - releaseLock │ │ - SETNX Lock │
└─────────────────┘ │ - lockRenewal │ │ - Auto-Renewal │
└──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ baasix_Tasks │
│ PostgreSQL │
└─────────────────┘When TASK_REDIS_ENABLED=true, only one instance can hold the task processing lock at a time, preventing duplicate task execution across instances.
Environment Configuration
Configure TasksService behavior with these environment variables:
Required Variables
# Enable the task service
TASK_SERVICE_ENABLED=true
# Cache must be enabled for task caching
CACHE_ENABLED=trueOptional Variables
# Task cache refresh interval in seconds (default: 600, maximum: 10800 = 3 hours)
TASK_LIST_REFRESH_INTERVAL=600
# Maximum time to wait for running tasks during shutdown in seconds (default: 30)
TASK_SHUTDOWN_WAIT_TIME=30Multi-Instance Configuration (Redis)
For multi-instance deployments (PM2 cluster, Kubernetes, etc.), enable Redis-based distributed locking:
# Enable Redis for distributed task locking
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379Why separate Redis configuration?
| Purpose | Config | Use Case |
|---|---|---|
| Socket.IO Clustering | SOCKET_REDIS_ENABLED + SOCKET_REDIS_URL | WebSocket scaling |
| Query Caching | CACHE_ADAPTER=redis + CACHE_REDIS_URL | Performance optimization |
| Task Locking | TASK_REDIS_ENABLED + TASK_REDIS_URL | Prevent duplicate task execution |
This separation allows you to:
- Run distributed tasks with in-memory cache
- Use different Redis instances for different purposes
- Enable only the Redis features you need
Example .env Configuration
Single Instance (No Redis)
TASK_SERVICE_ENABLED=true
CACHE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60Multi-Instance (With Redis)
TASK_SERVICE_ENABLED=true
CACHE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60
# Distributed locking for multi-instance
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379Important Limitations
- Refresh Interval: Maximum allowed refresh interval is 3 hours (10800 seconds) to prevent missing tasks
- Task Time Window: Only tasks with
scheduled_timewithin 4 hours from current time are cached - Memory Optimization: Time filtering reduces memory usage by excluding far-future tasks
API Reference
Core Methods
getNotStartedTasks()
Retrieves cached "Not started" tasks from the baasix_Tasks table that are scheduled within 4 hours.
const tasks = await tasksService.getNotStartedTasks();
// Returns: Array of task objects with task_status: "Not started" and scheduled_time within 4 hoursReturns: Promise<Array> - Array of task objects (filtered by scheduled_time)
Throws: Never throws, returns empty array on error
Note: Only returns tasks scheduled within 4 hours from current time
setTaskRunning(isRunning)
Sets the global task running state to coordinate task execution.
await tasksService.setTaskRunning(true); // Mark task as running
await tasksService.setTaskRunning(false); // Mark task as not runningParameters:
isRunning(boolean):trueto mark a task as running,falsewhen done
Returns: Promise<void>
isTaskRunning()
Checks if any task is currently running in the system.
const isRunning = await tasksService.isTaskRunning();
if (isRunning) {
console.log('A task is already running');
return; // Skip task execution
}Returns: Promise<boolean> - true if a task is running, false otherwise
forceRefresh()
Manually triggers a cache refresh, useful for testing or immediate updates.
await tasksService.forceRefresh();
console.log('Cache refreshed manually');Returns: Promise<void>
getCacheStats()
Returns statistics about the cache and service status.
const stats = await tasksService.getCacheStats();
console.log(stats);
// {
// cachedTasksCount: 5,
// isTaskRunning: false,
// refreshInterval: 600000,
// refreshIntervalSeconds: 600,
// maxRefreshIntervalSeconds: 10800,
// taskTimeWindow: "4 hours",
// initialized: true,
// lastRefreshed: "2023-12-01T10:30:00.000Z"
// }Returns: Promise<Object> - Cache statistics object
Usage in Extensions
Schedule Extension Example
Create a schedule extension that processes tasks periodically:
// extensions/baasix-schedule-task-processor/index.js
import schedule from 'node-schedule';
import { TasksService, ItemsService } from '@baasix/baasix';
const tasksService = new TasksService();
// Run every 5 minutes
const job = schedule.scheduleJob('*/5 * * * *', async function () {
console.log('Task processor started');
// Check if another task is already running
if (await tasksService.isTaskRunning()) {
console.log('Task already running, skipping...');
return;
}
// Mark task as running
await tasksService.setTaskRunning(true);
try {
// Get available tasks
const tasks = await tasksService.getNotStartedTasks();
if (tasks.length === 0) {
console.log('No tasks to process');
return;
}
console.log(`Processing ${tasks.length} tasks`);
// Process first available task
const task = tasks[0];
await processTask(task);
} catch (error) {
console.error('Task processing error:', error);
} finally {
// Always mark task as not running
await tasksService.setTaskRunning(false);
}
});
async function processTask(task) {
// Use ItemsService for task updates
const tasksItemsService = new ItemsService('baasix_Tasks', {
accountability: { bypassPermissions: true },
});
try {
// Update task status to Running
await tasksItemsService.updateOne(task.id, {
task_status: 'Running',
});
// Process task based on task.type
switch (task.type) {
case 'email_batch':
await processBatchEmails(task);
break;
case 'data_export':
await processDataExport(task);
break;
case 'cleanup':
await processCleanup(task);
break;
default:
throw new Error(`Unknown task type: ${task.type}`);
}
// Mark task as completed
await tasksItemsService.updateOne(task.id, {
task_status: 'Completed',
});
console.log(`Task ${task.id} completed successfully`);
} catch (error) {
// Mark task as error
await tasksItemsService.updateOne(task.id, {
task_status: 'Error',
error_message: error.message,
});
console.error(`Task ${task.id} failed:`, error);
throw error;
}
}
export default job;Hook Extension Example
Create a hook that creates tasks based on certain events:
// extensions/baasix-hook-task-creator/index.js
import { ItemsService, TasksService } from '@baasix/baasix';
const tasksService = new TasksService();
export default (hooksManager, context) => {
// Create a task when a large data import is completed
hooksManager.registerHook('data_imports', 'items.create.after', async ({ data }) => {
if (data.status === 'completed' && data.record_count > 1000) {
// Use ItemsService to create tasks
const tasksItemsService = new ItemsService('baasix_Tasks', {
accountability: { bypassPermissions: true },
});
// Create a cleanup task
await tasksItemsService.createOne({
type: 'cleanup_temp_files',
task_status: 'Not started',
metadata: JSON.stringify({
import_id: data.id,
temp_files: data.temp_files,
}),
});
console.log(`Created cleanup task for import ${data.id}`);
// Optionally force refresh cache for immediate availability
await tasksService.forceRefresh();
}
return data;
});
};Best Practices
1. Always Use try/finally for Task State
// ✅ Good - Always reset running state
await tasksService.setTaskRunning(true);
try {
// Process tasks
} finally {
await tasksService.setTaskRunning(false);
}
// ❌ Bad - State might not be reset on error
await tasksService.setTaskRunning(true);
// Process tasks
await tasksService.setTaskRunning(false);2. Check Running State Before Starting Tasks
// ✅ Good - Check before starting
if (await tasksService.isTaskRunning()) {
return; // Skip if already running
}
// ❌ Bad - Multiple tasks might run concurrently
// Start processing without checking3. Handle Task Status Updates Properly
// ✅ Good - Update task status in database
await TasksModel.update({ task_status: 'Running' }, { where: { id: task.id } });
// Process task...
await TasksModel.update({ task_status: 'Completed' }, { where: { id: task.id } });4. Use Appropriate Refresh Intervals
# ✅ Good - For high-frequency task creation
TASK_LIST_REFRESH_INTERVAL=60 # 1 minute
# ✅ Good - For normal usage
TASK_LIST_REFRESH_INTERVAL=1800 # 30 minutes
# ⚠️ Maximum enforced - Longest allowed interval
TASK_LIST_REFRESH_INTERVAL=10800 # 3 hours (system maximum)
# ❌ Bad - Will be capped at 3 hours
TASK_LIST_REFRESH_INTERVAL=14400 # 4 hours (automatically reduced to 3 hours)5. Implement Proper Error Handling
try {
await processTask(task);
await TasksModel.update({ task_status: 'Completed' }, { where: { id: task.id } });
} catch (error) {
await TasksModel.update(
{
task_status: 'Error',
error_message: error.message,
},
{ where: { id: task.id } },
);
throw error; // Re-throw to trigger finally block
}Troubleshooting
Common Issues
1. Tasks Not Appearing in Cache
Symptoms: getNotStartedTasks() returns empty array despite having "Not started" tasks in database.
Solutions:
import { ItemsService, TasksService, getCacheService } from '@baasix/baasix';
const tasksService = new TasksService();
// Check service initialization
const stats = await tasksService.getCacheStats();
console.log('Service initialized:', stats.initialized);
console.log('Task time window:', stats.taskTimeWindow);
// Force refresh cache
await tasksService.forceRefresh();
// Check database directly using ItemsService
const tasksItemsService = new ItemsService('baasix_Tasks', {
accountability: { bypassPermissions: true },
});
const fourHoursFromNow = new Date();
fourHoursFromNow.setHours(fourHoursFromNow.getHours() + 4);
const dbTasks = await tasksItemsService.readByQuery({
filter: {
task_status: { eq: 'Not started' },
scheduled_time: { lte: fourHoursFromNow.toISOString() },
},
});
console.log('Tasks in DB (within 4 hours):', dbTasks.data.length);2. Tasks Stuck in "Running" State
Symptoms: Tasks remain in "Running" status and isTaskRunning() always returns true.
Solutions:
import { ItemsService, TasksService } from '@baasix/baasix';
const tasksService = new TasksService();
// Reset running state manually
await tasksService.setTaskRunning(false);
// Check for tasks stuck in running state
const tasksItemsService = new ItemsService('baasix_Tasks', {
accountability: { bypassPermissions: true },
});
const runningTasks = await tasksItemsService.readByQuery({
filter: { task_status: { eq: 'Running' } },
});
// Reset stuck tasks if needed
await tasksItemsService.updateByQuery(
{ task_status: { eq: 'Running' } },
{ task_status: 'Error', error_message: 'Manually reset' },
);3. Cache Not Refreshing
Symptoms: Cache doesn't update when tasks are created/updated/deleted.
Solutions:
import { getCacheService } from '@baasix/baasix';
// Check hook registration
console.log('TasksService hooks registered');
// Verify cache connection
const cache = getCacheService();
try {
await cache.set('test', 'value');
await cache.get('test');
console.log('Cache connection working');
} catch (error) {
console.error('Cache connection failed:', error);
}Debugging Tips
Enable Debug Logging
LOG_LEVEL=debugMonitor Cache Stats
setInterval(async () => {
const stats = await tasksService.getCacheStats();
console.log('TasksService Stats:', stats);
}, 30000); // Every 30 secondsCheck Service Health
// Add to your monitoring endpoint
app.get('/health/tasks', async (req, res) => {
try {
const stats = await tasksService.getCacheStats();
const isHealthy = stats.initialized && !stats.error;
res.status(isHealthy ? 200 : 500).json({
healthy: isHealthy,
stats,
});
} catch (error) {
res.status(500).json({
healthy: false,
error: error.message,
});
}
});Related Documentation
- Baasix Extensions - Creating custom extensions
- Hooks System - Understanding the hooks system
- Additional Features - Cache management
- Database Schema - Understanding the data model