BaasixBaasix

TasksService Documentation

← Back to Documentation Home

Table of Contents

  1. Overview
  2. Architecture
  3. Environment Configuration
  4. API Reference
  5. Usage in Extensions
  6. Best Practices
  7. Troubleshooting

Overview

The TasksService is a built-in service that manages background tasks efficiently by caching "Not started" tasks from the baasix_Tasks table and coordinating task execution across the system. It reduces database calls and ensures only one task runs at a time.

Key Features

  • Intelligent Caching: Keeps "Not started" tasks in Redis cache to minimize database queries
  • Time-Filtered Caching: Only caches tasks scheduled within 4 hours to reduce memory usage
  • Automatic Refresh: Periodically refreshes cache based on configurable intervals (max 3 hours)
  • Change Detection: Automatically invalidates cache when tasks are created, updated, or deleted
  • Task Coordination: Provides global state management to prevent concurrent task execution
  • Graceful Shutdown: Waits for running tasks to complete during server shutdown
  • Extension Integration: Easily integrates with custom extensions for task processing

Architecture

baasix_Tasks Table Structure

The TasksService works with the built-in baasix_Tasks table, which includes:

  • id: Primary key (auto-increment)
  • task_status: ENUM with values:
    • "Not started" - Tasks available for processing
    • "Running" - Currently executing tasks
    • "Completed" - Successfully finished tasks
    • "Error" - Failed tasks
  • type: String field defining the task type
  • scheduled_time: DateTime field for when the task should be executed
  • attachment_id: Optional file attachment reference
  • task_data: JSON field for task-specific data
  • result_data: JSON field for storing task results
  • error_data: JSON field for storing error information
  • Standard audit fields (userCreated, userUpdated, createdAt, updatedAt)

Service Architecture

Single Instance Mode:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Extensions    │───▶│   TasksService   │───▶│  Cache Service  │
│                 │    │                  │    │  (Memory/Redis) │
│ - Schedule Exts │    │ - getNotStarted  │    │                 │
│ - Hook Exts     │    │ - tryAcquireLock │    │ - Task Cache    │
│ - Custom Logic  │    │ - releaseLock    │    │ - Lock State    │
└─────────────────┘    └──────────────────┘    └─────────────────┘


                       ┌─────────────────┐
                       │ baasix_Tasks    │
                       │ PostgreSQL      │
                       └─────────────────┘

Multi-Instance Mode (with Redis):

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Instance 1     │───▶│   TasksService   │───▶│  Task Redis     │
│  Instance 2     │    │                  │    │  (Distributed)  │
│  Instance 3     │    │ - tryAcquireLock │    │                 │
│       ...       │    │ - releaseLock    │    │ - SETNX Lock    │
└─────────────────┘    │ - lockRenewal    │    │ - Auto-Renewal  │
                       └──────────────────┘    └─────────────────┘


                       ┌─────────────────┐
                       │ baasix_Tasks    │
                       │ PostgreSQL      │
                       └─────────────────┘

When TASK_REDIS_ENABLED=true, only one instance can hold the task processing lock at a time, preventing duplicate task execution across instances.

Environment Configuration

Configure TasksService behavior with these environment variables:

Required Variables

# Enable the task service
TASK_SERVICE_ENABLED=true

# Cache must be enabled for task caching
CACHE_ENABLED=true

Optional Variables

# Task cache refresh interval in seconds (default: 600, maximum: 10800 = 3 hours)
TASK_LIST_REFRESH_INTERVAL=600

# Maximum time to wait for running tasks during shutdown in seconds (default: 30)
TASK_SHUTDOWN_WAIT_TIME=30

Multi-Instance Configuration (Redis)

For multi-instance deployments (PM2 cluster, Kubernetes, etc.), enable Redis-based distributed locking:

# Enable Redis for distributed task locking
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379

Why separate Redis configuration?

PurposeConfigUse Case
Socket.IO ClusteringSOCKET_REDIS_ENABLED + SOCKET_REDIS_URLWebSocket scaling
Query CachingCACHE_ADAPTER=redis + CACHE_REDIS_URLPerformance optimization
Task LockingTASK_REDIS_ENABLED + TASK_REDIS_URLPrevent duplicate task execution

This separation allows you to:

  • Run distributed tasks with in-memory cache
  • Use different Redis instances for different purposes
  • Enable only the Redis features you need

Example .env Configuration

Single Instance (No Redis)

TASK_SERVICE_ENABLED=true
CACHE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60

Multi-Instance (With Redis)

TASK_SERVICE_ENABLED=true
CACHE_ENABLED=true
TASK_LIST_REFRESH_INTERVAL=1800
TASK_SHUTDOWN_WAIT_TIME=60

# Distributed locking for multi-instance
TASK_REDIS_ENABLED=true
TASK_REDIS_URL=redis://localhost:6379

Important Limitations

  • Refresh Interval: Maximum allowed refresh interval is 3 hours (10800 seconds) to prevent missing tasks
  • Task Time Window: Only tasks with scheduled_time within 4 hours from current time are cached
  • Memory Optimization: Time filtering reduces memory usage by excluding far-future tasks

API Reference

Core Methods

getNotStartedTasks()

Retrieves cached "Not started" tasks from the baasix_Tasks table that are scheduled within 4 hours.

const tasks = await tasksService.getNotStartedTasks();
// Returns: Array of task objects with task_status: "Not started" and scheduled_time within 4 hours

Returns: Promise<Array> - Array of task objects (filtered by scheduled_time) Throws: Never throws, returns empty array on error Note: Only returns tasks scheduled within 4 hours from current time

setTaskRunning(isRunning)

Sets the global task running state to coordinate task execution.

await tasksService.setTaskRunning(true); // Mark task as running
await tasksService.setTaskRunning(false); // Mark task as not running

Parameters:

  • isRunning (boolean): true to mark a task as running, false when done

Returns: Promise<void>

isTaskRunning()

Checks if any task is currently running in the system.

const isRunning = await tasksService.isTaskRunning();
if (isRunning) {
  console.log('A task is already running');
  return; // Skip task execution
}

Returns: Promise<boolean> - true if a task is running, false otherwise

forceRefresh()

Manually triggers a cache refresh, useful for testing or immediate updates.

await tasksService.forceRefresh();
console.log('Cache refreshed manually');

Returns: Promise<void>

getCacheStats()

Returns statistics about the cache and service status.

const stats = await tasksService.getCacheStats();
console.log(stats);
// {
//   cachedTasksCount: 5,
//   isTaskRunning: false,
//   refreshInterval: 600000,
//   refreshIntervalSeconds: 600,
//   maxRefreshIntervalSeconds: 10800,
//   taskTimeWindow: "4 hours",
//   initialized: true,
//   lastRefreshed: "2023-12-01T10:30:00.000Z"
// }

Returns: Promise<Object> - Cache statistics object

Usage in Extensions

Schedule Extension Example

Create a schedule extension that processes tasks periodically:

// extensions/baasix-schedule-task-processor/index.js
import schedule from 'node-schedule';
import { TasksService, ItemsService } from '@baasix/baasix';

const tasksService = new TasksService();

// Run every 5 minutes
const job = schedule.scheduleJob('*/5 * * * *', async function () {
  console.log('Task processor started');

  // Check if another task is already running
  if (await tasksService.isTaskRunning()) {
    console.log('Task already running, skipping...');
    return;
  }

  // Mark task as running
  await tasksService.setTaskRunning(true);

  try {
    // Get available tasks
    const tasks = await tasksService.getNotStartedTasks();

    if (tasks.length === 0) {
      console.log('No tasks to process');
      return;
    }

    console.log(`Processing ${tasks.length} tasks`);

    // Process first available task
    const task = tasks[0];
    await processTask(task);
  } catch (error) {
    console.error('Task processing error:', error);
  } finally {
    // Always mark task as not running
    await tasksService.setTaskRunning(false);
  }
});

async function processTask(task) {
  // Use ItemsService for task updates
  const tasksItemsService = new ItemsService('baasix_Tasks', {
    accountability: { bypassPermissions: true },
  });

  try {
    // Update task status to Running
    await tasksItemsService.updateOne(task.id, {
      task_status: 'Running',
    });

    // Process task based on task.type
    switch (task.type) {
      case 'email_batch':
        await processBatchEmails(task);
        break;
      case 'data_export':
        await processDataExport(task);
        break;
      case 'cleanup':
        await processCleanup(task);
        break;
      default:
        throw new Error(`Unknown task type: ${task.type}`);
    }

    // Mark task as completed
    await tasksItemsService.updateOne(task.id, {
      task_status: 'Completed',
    });

    console.log(`Task ${task.id} completed successfully`);
  } catch (error) {
    // Mark task as error
    await tasksItemsService.updateOne(task.id, {
      task_status: 'Error',
      error_message: error.message,
    });

    console.error(`Task ${task.id} failed:`, error);
    throw error;
  }
}

export default job;

Hook Extension Example

Create a hook that creates tasks based on certain events:

// extensions/baasix-hook-task-creator/index.js
import { ItemsService, TasksService } from '@baasix/baasix';

const tasksService = new TasksService();

export default (hooksManager, context) => {
  // Create a task when a large data import is completed
  hooksManager.registerHook('data_imports', 'items.create.after', async ({ data }) => {
    if (data.status === 'completed' && data.record_count > 1000) {
      // Use ItemsService to create tasks
      const tasksItemsService = new ItemsService('baasix_Tasks', {
        accountability: { bypassPermissions: true },
      });

      // Create a cleanup task
      await tasksItemsService.createOne({
        type: 'cleanup_temp_files',
        task_status: 'Not started',
        metadata: JSON.stringify({
          import_id: data.id,
          temp_files: data.temp_files,
        }),
      });

      console.log(`Created cleanup task for import ${data.id}`);

      // Optionally force refresh cache for immediate availability
      await tasksService.forceRefresh();
    }

    return data;
  });
};

Best Practices

1. Always Use try/finally for Task State

// ✅ Good - Always reset running state
await tasksService.setTaskRunning(true);
try {
  // Process tasks
} finally {
  await tasksService.setTaskRunning(false);
}

// ❌ Bad - State might not be reset on error
await tasksService.setTaskRunning(true);
// Process tasks
await tasksService.setTaskRunning(false);

2. Check Running State Before Starting Tasks

// ✅ Good - Check before starting
if (await tasksService.isTaskRunning()) {
  return; // Skip if already running
}

// ❌ Bad - Multiple tasks might run concurrently
// Start processing without checking

3. Handle Task Status Updates Properly

// ✅ Good - Update task status in database
await TasksModel.update({ task_status: 'Running' }, { where: { id: task.id } });

// Process task...

await TasksModel.update({ task_status: 'Completed' }, { where: { id: task.id } });

4. Use Appropriate Refresh Intervals

# ✅ Good - For high-frequency task creation
TASK_LIST_REFRESH_INTERVAL=60  # 1 minute

# ✅ Good - For normal usage
TASK_LIST_REFRESH_INTERVAL=1800  # 30 minutes

# ⚠️ Maximum enforced - Longest allowed interval
TASK_LIST_REFRESH_INTERVAL=10800  # 3 hours (system maximum)

# ❌ Bad - Will be capped at 3 hours
TASK_LIST_REFRESH_INTERVAL=14400  # 4 hours (automatically reduced to 3 hours)

5. Implement Proper Error Handling

try {
  await processTask(task);
  await TasksModel.update({ task_status: 'Completed' }, { where: { id: task.id } });
} catch (error) {
  await TasksModel.update(
    {
      task_status: 'Error',
      error_message: error.message,
    },
    { where: { id: task.id } },
  );
  throw error; // Re-throw to trigger finally block
}

Troubleshooting

Common Issues

1. Tasks Not Appearing in Cache

Symptoms: getNotStartedTasks() returns empty array despite having "Not started" tasks in database.

Solutions:

import { ItemsService, TasksService, getCacheService } from '@baasix/baasix';

const tasksService = new TasksService();

// Check service initialization
const stats = await tasksService.getCacheStats();
console.log('Service initialized:', stats.initialized);
console.log('Task time window:', stats.taskTimeWindow);

// Force refresh cache
await tasksService.forceRefresh();

// Check database directly using ItemsService
const tasksItemsService = new ItemsService('baasix_Tasks', {
  accountability: { bypassPermissions: true },
});

const fourHoursFromNow = new Date();
fourHoursFromNow.setHours(fourHoursFromNow.getHours() + 4);

const dbTasks = await tasksItemsService.readByQuery({
  filter: {
    task_status: { eq: 'Not started' },
    scheduled_time: { lte: fourHoursFromNow.toISOString() },
  },
});
console.log('Tasks in DB (within 4 hours):', dbTasks.data.length);

2. Tasks Stuck in "Running" State

Symptoms: Tasks remain in "Running" status and isTaskRunning() always returns true.

Solutions:

import { ItemsService, TasksService } from '@baasix/baasix';

const tasksService = new TasksService();

// Reset running state manually
await tasksService.setTaskRunning(false);

// Check for tasks stuck in running state
const tasksItemsService = new ItemsService('baasix_Tasks', {
  accountability: { bypassPermissions: true },
});

const runningTasks = await tasksItemsService.readByQuery({
  filter: { task_status: { eq: 'Running' } },
});

// Reset stuck tasks if needed
await tasksItemsService.updateByQuery(
  { task_status: { eq: 'Running' } },
  { task_status: 'Error', error_message: 'Manually reset' },
);

3. Cache Not Refreshing

Symptoms: Cache doesn't update when tasks are created/updated/deleted.

Solutions:

import { getCacheService } from '@baasix/baasix';

// Check hook registration
console.log('TasksService hooks registered');

// Verify cache connection
const cache = getCacheService();
try {
  await cache.set('test', 'value');
  await cache.get('test');
  console.log('Cache connection working');
} catch (error) {
  console.error('Cache connection failed:', error);
}

Debugging Tips

Enable Debug Logging

LOG_LEVEL=debug

Monitor Cache Stats

setInterval(async () => {
  const stats = await tasksService.getCacheStats();
  console.log('TasksService Stats:', stats);
}, 30000); // Every 30 seconds

Check Service Health

// Add to your monitoring endpoint
app.get('/health/tasks', async (req, res) => {
  try {
    const stats = await tasksService.getCacheStats();
    const isHealthy = stats.initialized && !stats.error;

    res.status(isHealthy ? 200 : 500).json({
      healthy: isHealthy,
      stats,
    });
  } catch (error) {
    res.status(500).json({
      healthy: false,
      error: error.message,
    });
  }
});

← Back to Documentation Home

On this page