BaasixBaasix
Reference

TasksService Documentation

← Back to Documentation Home

Table of Contents

  1. Overview
  2. Architecture
  3. Environment Configuration
  4. API Reference
  5. Usage in Extensions
  6. Best Practices
  7. Troubleshooting

Overview

The TasksService is a built-in service that manages background tasks efficiently by caching "Not started" tasks from the baasix_Tasks table and coordinating task execution across the system. It reduces database calls and provides atomic task claiming, distributed job locking, stall recovery, and configurable concurrency.

Key Features

  • Intelligent Caching: Keeps "Not started" tasks in Redis cache to minimize database queries
  • Time-Filtered Caching: Only caches tasks scheduled within 4 hours to reduce memory usage
  • Automatic Refresh: Periodically refreshes cache based on configurable intervals (max 3 hours)
  • Change Detection: Automatically invalidates cache when tasks are created, updated, or deleted
  • Atomic Task Claiming: claimTask() uses atomic DB updates to prevent duplicate processing
  • Distributed Job Locking: acquireJobLock() / releaseJobLock() for locking any scheduled job across instances
  • Concurrency Control: Configurable TASK_CONCURRENCY for parallel task processing per instance
  • Stall Recovery: Automatically recovers tasks stuck in "Running" state with optional retry
  • Graceful Shutdown: Waits for running tasks to complete, releases all locks during shutdown
  • Extension Integration: Available in extensions via context and direct import

Architecture

baasix_Tasks Table Structure

The TasksService works with the built-in baasix_Tasks table, which includes:

  • id: Primary key (auto-increment)
  • task_status: ENUM with values:
    • "Not started" - Tasks available for processing
    • "Running" - Currently executing tasks
    • "Completed" - Successfully finished tasks
    • "Error" - Failed tasks
  • type: String field defining the task type
  • scheduled_time: DateTime field for when the task should be executed
  • started_at: DateTime field auto-set when task starts running (used for stall detection)
  • max_retries: Integer field for maximum retry attempts (default: 0)
  • retry_count: Integer field tracking retry attempts (system-generated)
  • attachment_id: Optional file attachment reference
  • task_data: JSON field for task-specific data
  • result_data: JSON field for storing task results
  • error_data: JSON field for storing error information
  • Standard audit fields (userCreated, userUpdated, createdAt, updatedAt)

Service Architecture

Single Instance Mode:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Extensions    │───▶│   TasksService   │───▶│  Cache Service  │
│                 │    │                  │    │  (Memory/Redis) │
│ - Schedule Exts │    │ - claimTask      │    │                 │
│ - Hook Exts     │    │ - acquireJobLock │    │ - Task Cache    │
│ - Custom Logic  │    │ - releaseJobLock │    │ - Lock State    │
└─────────────────┘    └──────────────────┘    └─────────────────┘


                       ┌─────────────────┐
                       │ baasix_Tasks    │
                       │ PostgreSQL      │
                       └─────────────────┘

Multi-Instance Mode (with Redis):

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Instance 1     │───▶│   TasksService   │───▶│  Task Redis     │
│  Instance 2     │    │                  │    │  (Distributed)  │
│  Instance 3     │    │ - claimTask      │    │                 │
│       ...       │    │ - acquireJobLock │    │ - SETNX Lock    │
└─────────────────┘    │ - releaseJobLock │    │ - Auto-Renewal  │
                       │ - stallRecovery  │    │ - Job Locks     │
                       └──────────────────┘    └─────────────────┘


                       ┌─────────────────┐
                       │ baasix_Tasks    │
                       │ PostgreSQL      │
                       └─────────────────┘

When TASK_REDIS_ENABLED=true, distributed locking ensures only one instance processes tasks at a time, and acquireJobLock() prevents the same scheduled job from running on multiple instances simultaneously.

Environment Configuration

Configure TasksService behavior with these environment variables:

Required Variables

# Enable the task service
TASK_SERVICE_ENABLED=true

Note: TasksService uses the system cache which is always initialized with the configured adapter (memory by default, or Redis/Upstash if SYSTEM_CACHE_ADAPTER is set). The DATA_CACHE_ENABLED environment variable only affects query caching, not task caching.

Optional Variables

# Task cache refresh interval in seconds (default: 600, maximum: 10800 = 3 hours)
TASK_LIST_REFRESH_INTERVAL=600

# Maximum time to wait for running tasks during shutdown in seconds (default: 30)
TASK_SHUTDOWN_WAIT_TIME=30

# Maximum concurrent tasks per instance (default: 1)
TASK_CONCURRENCY=1

# Seconds before a Running task is considered stalled (default: 300, minimum: 60)
# Stalled tasks are auto-recovered (retried if max_retries > 0, else marked Error)
TASK_STALL_TIMEOUT=300

Multi-Instance Configuration (Redis)

For multi-instance deployments (PM2 cluster, Kubernetes, etc.), enable Redis-based distributed locking:

# Enable Redis for distributed task locking
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379

Why separate Redis configurations?

PurposeConfigUse Case
Socket.IO ClusteringSOCKET_REDIS_ENABLED + SOCKET_REDIS_URLWebSocket scaling across instances
System CacheSYSTEM_CACHE_ADAPTER=redis + SYSTEM_CACHE_REDIS_URLShared system cache (tasks, permissions, settings)
Data CacheDATA_CACHE_ENABLED=true + DATA_CACHE_ADAPTER=redisCache database query results
Task LockingTASK_REDIS_ENABLED + TASK_REDIS_URLPrevent duplicate task execution

This separation allows you to:

  • Run distributed tasks with in-memory cache (default)
  • Share task cache across instances using SYSTEM_CACHE_ADAPTER=redis
  • Use different Redis instances for different purposes
  • Enable only the Redis features you need

Important: TASK_REDIS_ENABLED is for distributed locking only (ensuring one instance processes tasks). For shared task cache across instances, set SYSTEM_CACHE_ADAPTER=redis.

Example .env Configuration

Single Instance (Memory Cache)

TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60
TASK_CONCURRENCY=3
TASK_STALL_TIMEOUT=300

Multi-Instance (With Redis for Distributed Locking)

TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60

# Distributed locking for multi-instance (prevents duplicate task execution)
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379

With Redis Cache Adapter (Optional)

If you want task caching to use Redis instead of memory (useful for shared cache across instances):

TASK_SERVICE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800

# Use Redis for the system cache adapter (affects all internal caching including tasks)
SYSTEM_CACHE_ADAPTER=redis
SYSTEM_CACHE_REDIS_URL=redis://localhost:6379

# Distributed locking (separate from cache)
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379

Important Limitations

  • Refresh Interval: Maximum allowed refresh interval is 3 hours (10800 seconds) to prevent missing tasks
  • Task Time Window: Only tasks with scheduled_time within 4 hours from current time are cached
  • Memory Optimization: Time filtering reduces memory usage by excluding far-future tasks

API Reference

Task Claiming

claimTask(taskId)

Atomically claim a task for processing. Uses UPDATE ... WHERE task_status = 'Not started' to prevent duplicate processing across instances. If another worker already claimed the task, returns null.

const claimed = await tasksService.claimTask(taskId);
if (!claimed) {
  console.log('Task already claimed by another worker');
  return;
}
// Process the claimed task...

Parameters:

  • taskId (string | number): The ID of the task to claim

Returns: Promise<BackgroundTask | null> - The claimed task record, or null if already claimed/not found

Task List

getNotStartedTasks()

Retrieves cached "Not started" tasks from the baasix_Tasks table that are scheduled within 4 hours.

const tasks = await tasksService.getNotStartedTasks();
// Returns: Array of task objects with task_status: "Not started" and scheduled_time within 4 hours

Returns: Promise<BackgroundTask[]> - Array of task objects (filtered by scheduled_time) Throws: Never throws, returns empty array on error Note: Only returns tasks scheduled within 4 hours from current time

Distributed Job Locking

acquireJobLock(jobName, ttlSeconds?)

Acquire a named distributed lock for a scheduled job. Prevents the same job from running on multiple instances simultaneously.

  • With Redis (TASK_REDIS_ENABLED=true): uses SET NX EX for cross-instance locking
  • Without Redis: uses in-memory set (prevents re-entry within same process)
const locked = await tasksService.acquireJobLock('attendance-cron', 900);
if (!locked) {
  console.log('Job already running on another instance');
  return;
}
try {
  // Process job...
} finally {
  await tasksService.releaseJobLock('attendance-cron');
}

Parameters:

  • jobName (string): Unique job identifier (e.g., "attendance-cron", "cleanup-job")
  • ttlSeconds (number, optional): Lock TTL in seconds. Should be >= your job's max execution time. Default: 300

Returns: Promise<boolean> - true if lock acquired, false if already held

releaseJobLock(jobName)

Release a named job lock. Only releases if this instance owns the lock (atomic via Lua script in Redis mode).

await tasksService.releaseJobLock('attendance-cron');

Parameters:

  • jobName (string): The job name used in acquireJobLock()

Returns: Promise<boolean> - true if released, false if not owned or error

Instance Lock (Task Processing)

tryAcquireLock(lockTimeout?)

Acquire a task processing slot. Respects TASK_CONCURRENCY — allows up to N concurrent tasks per instance. In multi-instance mode, only one instance can hold the processing lock at a time.

const acquired = await tasksService.tryAcquireLock();
if (!acquired) return;

try {
  // Process tasks...
} finally {
  await tasksService.releaseLock();
}

Parameters:

  • lockTimeout (number, optional): Redis lock TTL in seconds (default: 60)

Returns: Promise<boolean> - true if slot acquired, false if at capacity or lock held by another instance

releaseLock()

Release a task processing slot. When all slots are released, the instance lock (Redis) is also released.

Returns: Promise<boolean> - true if released

Legacy Methods (Deprecated)

setTaskRunning(isRunning) (deprecated)

Use tryAcquireLock() / releaseLock() instead. Still functional — delegates to the new methods internally.

// ❌ Deprecated
await tasksService.setTaskRunning(true);
// ✅ Preferred
const acquired = await tasksService.tryAcquireLock();

isTaskRunning() (deprecated)

Returns true when running task count >= configured TASK_CONCURRENCY. Still functional.

Utility Methods

forceRefresh()

Manually triggers a cache refresh, useful for testing or immediate updates.

await tasksService.forceRefresh();

Returns: Promise<void>

getCacheStats()

Returns statistics about the cache and service status.

const stats = await tasksService.getCacheStats();
// {
//   cachedTasksCount: 5,
//   runningCount: 0,
//   concurrency: 3,
//   isAtCapacity: false,
//   stallTimeout: 300,
//   refreshInterval: 600000,
//   initialized: true,
//   distributedMode: true,
//   hasInstanceLock: false,
//   instanceId: "a1b2c3d4"
// }

Returns: Promise<Object> - Cache statistics object

recoverStalledTasks()

Recover tasks stuck in "Running" state beyond the stall timeout. Called automatically during initialization and each periodic cache refresh.

  • If retry_count < max_retries: resets to "Not started" for automatic retry
  • Otherwise: marks as "Error" with stall information

Usage in Extensions

Scenario 1: Processing Background Tasks with Atomic Claiming

Use claimTask() for safe per-task processing that works across multiple instances:

// extensions/baasix-schedule-task-processor/index.js
import schedule from 'node-schedule';

export default async function (hooksManager, context) {
  const { tasksService, ItemsService } = context;

  // Run every minute
  schedule.scheduleJob('* * * * *', async function () {
    const tasks = await tasksService.getNotStartedTasks();
    const taskService = new ItemsService('baasix_Tasks');

    for (const task of tasks) {
      // Atomic claim — returns null if already taken by another worker
      const claimed = await tasksService.claimTask(task.id);
      if (!claimed) continue;

      try {
        if (claimed.type === 'email_batch') {
          await processBatchEmails(claimed);
        } else if (claimed.type === 'data_export') {
          await processDataExport(claimed);
        }

        await taskService.updateOne(task.id, {
          task_status: 'Completed',
          result_data: { processed_at: new Date().toISOString() },
        });
      } catch (error) {
        await taskService.updateOne(task.id, {
          task_status: 'Error',
          error_data: { message: error.message },
        });
      }
    }
  });
}

Scenario 2: Locking a Recurring Scheduled Job

Use acquireJobLock() / releaseJobLock() for cron jobs that are NOT task-table based but need single-instance execution:

// extensions/baasix-schedule-attendance/index.js
import schedule from 'node-schedule';
import moment from 'moment';

export default async function (hooksManager, context) {
  const { tasksService } = context;

  // Every 15 minutes: mark absent for ended classes
  schedule.scheduleJob('*/15 * * * *', async function () {
    // Acquire a named lock — only one instance runs this job
    const locked = await tasksService.acquireJobLock('attendance-cron', 900);
    if (!locked) return; // another instance is running this job

    try {
      const checkStart = moment().subtract(1, 'hours').toDate().toISOString();
      const checkEnd = moment().subtract(30, 'minutes').toDate().toISOString();

      await AttendanceUtils.ProcessScheduleAttendance({
        endtime: { between: [checkStart, checkEnd] },
        attendance_enabled: { ne: false },
      });

      await AttendanceUtils.ScheduleAttendanceCalculation();
      await AttendanceUtils.CalculateConsecutiveDaysAttendance();
    } catch (error) {
      console.error('Attendance cron error:', error);
    } finally {
      await tasksService.releaseJobLock('attendance-cron');
    }
  });
}

Scenario 3: Using Instance Lock for Sequential Processing

Use tryAcquireLock() / releaseLock() when you want the legacy single-lock gate pattern:

// extensions/baasix-schedule-task-processor/index.js
export default async function (hooksManager, context) {
  const { tasksService, ItemsService } = context;

  schedule.scheduleJob('* * * * *', async function () {
    // Only one cron tick runs at a time per instance
    const acquired = await tasksService.tryAcquireLock(60);
    if (!acquired) return;

    try {
      const tasks = await tasksService.getNotStartedTasks();
      const taskService = new ItemsService('baasix_Tasks');

      for (const task of tasks) {
        await taskService.updateOne(task.id, { task_status: 'Running' });
        // Process task...
        await taskService.updateOne(task.id, { task_status: 'Completed' });
      }
    } finally {
      await tasksService.releaseLock();
    }
  });
}

Hook Extension Example

Create tasks based on events:

// extensions/baasix-hook-task-creator/index.js
export default (hooksManager, context) => {
  const { tasksService, ItemsService } = context;

  hooksManager.registerHook('data_imports', 'items.create.after', async ({ data }) => {
    if (data.status === 'completed' && data.record_count > 1000) {
      const tasksItemsService = new ItemsService('baasix_Tasks', {
        accountability: { bypassPermissions: true },
      });

      await tasksItemsService.createOne({
        type: 'cleanup_temp_files',
        task_status: 'Not started',
        scheduled_time: new Date(),
        max_retries: 3, // Auto-retry up to 3 times if stalled
        task_data: JSON.stringify({ import_id: data.id }),
      });

      // Force refresh cache for immediate availability
      await tasksService.forceRefresh();
    }

    return data;
  });
};

Best Practices

1. Use claimTask() for Task Processing

// ✅ Best - Atomic per-task claim, safe across multiple instances
for (const task of tasks) {
  const claimed = await tasksService.claimTask(task.id);
  if (!claimed) continue;
  // Process task...
}

// ⚠️ Legacy - Check-then-act pattern has a race window
if (await tasksService.isTaskRunning()) return;
await tasksService.setTaskRunning(true);

2. Use acquireJobLock() for Non-Task Cron Jobs

// ✅ Best - Named lock, works across instances
const locked = await tasksService.acquireJobLock('my-cron-job', 600);
if (!locked) return;
try {
  /* ... */
} finally {
  await tasksService.releaseJobLock('my-cron-job');
}

// ❌ Bad - No protection against multi-instance execution
schedule.scheduleJob('*/5 * * * *', async () => {
  await doWork(); // Runs on every instance!
});

3. Set TTL >= Max Execution Time

// ✅ Good - 15min cron, lock TTL = 15min (900s)
await tasksService.acquireJobLock('cleanup', 900);

// ❌ Bad - If job takes 10min, lock expires at 5min, another instance starts
await tasksService.acquireJobLock('cleanup', 300);

4. Configure Stall Recovery with Retries

// When creating tasks, set max_retries for auto-recovery from stalls
await tasksItemsService.createOne({
  type: 'email_batch',
  task_status: 'Not started',
  scheduled_time: new Date(),
  max_retries: 3, // Auto-retry up to 3 times if stalled (based on TASK_STALL_TIMEOUT)
});

5. Use Appropriate Refresh Intervals

# ✅ Good - For high-frequency task creation
TASK_LIST_REFRESH_INTERVAL=60  # 1 minute

# ✅ Good - For normal usage
TASK_LIST_REFRESH_INTERVAL=1800  # 30 minutes

# ⚠️ Maximum enforced - Longest allowed interval
TASK_LIST_REFRESH_INTERVAL=10800  # 3 hours (system maximum)

Troubleshooting

Common Issues

1. Tasks Not Appearing in Cache

Symptoms: getNotStartedTasks() returns empty array despite having "Not started" tasks in database.

Solutions:

// Check service initialization
const stats = await tasksService.getCacheStats();
console.log('Service initialized:', stats.initialized);

// Force refresh cache
await tasksService.forceRefresh();

// Verify: is the task scheduled within 4 hours?
// Only tasks with scheduled_time <= now + 4 hours are cached

2. Tasks Stuck in "Running" State

Symptoms: Tasks remain in "Running" status and are not processed.

Solutions:

The stall recovery system handles this automatically:

  • TASK_STALL_TIMEOUT (default: 300s) defines how long before a Running task is considered stalled
  • Stalled tasks with max_retries > retry_count are reset to "Not started" for retry
  • Stalled tasks with no retries left are marked as "Error"

Recovery runs during initialization and each periodic cache refresh.

# Reduce stall timeout for faster recovery
TASK_STALL_TIMEOUT=120

3. Same Job Running on Multiple Instances

Symptoms: Cron jobs execute on every instance in a multi-instance deployment.

Solutions:

// Use acquireJobLock for any cron job that should run on only one instance
const locked = await tasksService.acquireJobLock('my-job', 600);
if (!locked) return;
try {
  /* ... */
} finally {
  await tasksService.releaseJobLock('my-job');
}

Requires TASK_REDIS_ENABLED=true for cross-instance coordination.

4. Duplicate Task Processing

Symptoms: The same task is processed by multiple workers.

Solutions:

// Use claimTask() instead of manual status updates
const claimed = await tasksService.claimTask(task.id);
if (!claimed) continue; // Already claimed by another worker

// claimTask() uses atomic UPDATE ... WHERE task_status = 'Not started'
// Only one worker wins the claim

Debugging Tips

Enable Debug Logging

LOG_LEVEL=debug

Monitor Cache Stats

setInterval(async () => {
  const stats = await tasksService.getCacheStats();
  console.log('TasksService Stats:', stats);
}, 30000);

← Back to Documentation Home

On this page