Saga Persistence

Store and resume durable workflows with pluggable storage

Sagas need persistent storage to survive restarts and resume after waiting for external events. JustScale provides a storage abstraction with in-memory and PostgreSQL implementations, plus the ability to create custom backends.

Storage Abstraction

The SagaStorage abstract class defines the storage interface. Bind an implementation via DI to enable persistence:

setup.tsTypeScript
import { createFeatureBuilder, bindService } from '@justscale/core';
import {
  SagaStorage,
  InMemorySagaStorage,
  SagaStorageService,
} from '@justscale/saga';

// For development/testing - use in-memory storage
export const DevFeature = createFeatureBuilder()
  .name('dev')
  .provides((b) => b
    .add(bindService(SagaStorage, InMemorySagaStorage))
    .add(SagaStorageService)
    .add(OrderSaga)
  );

// For production - use PostgreSQL storage
import { PostgresSagaStorage } from '@justscale/postgres';

export const ProdFeature = createFeatureBuilder()
  .name('prod')
  .provides((b) => b
    .add(bindService(SagaStorage, PostgresSagaStorage))
    .add(SagaStorageService)
    .add(OrderSaga)
  );

In-Memory Storage

Use InMemorySagaStorage for development and testing. Data is lost on restart - not suitable for production:

in-memory.tsTypeScript
import {
  InMemorySagaStorage,
  createInMemorySagaStorage,
} from '@justscale/saga';

// Option 1: Use as a class (for DI binding)
const storage = new InMemorySagaStorage();

// Option 2: Use factory function
const storage = createInMemorySagaStorage();

// Storage methods
await storage.save(saga);
const saga = await storage.load(workflowId);
await storage.delete(workflowId);

// Query waiting sagas
const waiting = await storage.findWaitingFor('payment-confirmed');
const timedOut = await storage.findTimedOut(new Date());

// List and count
const all = await storage.list({ status: 'suspended' });
const count = await storage.count({ sagaPath: 'orders/process' });

// For testing - clear all data
storage.clear();

Executing Sagas with Storage

Use executeSaga and signalSaga with a container that has storage bound:

execute.tsTypeScript
import { executeSaga, signalSaga } from '@justscale/saga';
import { ProcessOrderSaga } from './sagas';

// Execute a saga (starts or resumes)
const result = await executeSaga(ProcessOrderSaga, container, {
  workflowId: 'order-123',
  body: {
    items: [{ productId: 'prod-1', quantity: 2 }],
  },
  params: { orderId: 'order-123' },
});

if (result.status === 'suspended') {
  console.log('Saga waiting for:', result.waitCondition?.event);
  // Store workflowId for later signal
}

if (result.status === 'completed') {
  console.log('Saga completed:', result.output);
}

// Later, send a signal to resume
await signalSaga(
  ProcessOrderSaga,
  container,
  storage,  // SagaStorage instance
  'order-123',
  'payment-confirmed',
  { transactionId: 'txn-456', amount: 99.99 }
);

HTTP Integration

The sagaStart and sagaSignal plugins handle HTTP-to-saga integration automatically:

http-integration.tsTypeScript
import { createController, Post } from '@justscale/core';
import { body } from '@justscale/http';
import { sagaStart, sagaSignal } from '@justscale/saga';
import { ProcessOrderSaga } from './sagas';

export const OrderController = createController('/orders', {
  routes: (_, { Post }) => ({
    // Start a new order saga
    create: Post('/')
      .apply(body(z.object({
        items: z.array(ItemSchema),
      })))
      .apply(sagaStart(ProcessOrderSaga))
      .handle(({ sagaResult }) => {
        // sagaResult has workflowId, status, output/error
        if (sagaResult.status === 'suspended') {
          return {
            workflowId: sagaResult.workflowId,
            status: 'pending',
            waitingFor: sagaResult.waitCondition?.event,
          };
        }
        return {
          workflowId: sagaResult.workflowId,
          status: 'completed',
          result: sagaResult.output,
        };
      }),

    // Confirm payment (sends signal)
    confirmPayment: Post('/:workflowId/confirm-payment')
      .apply(body(z.object({
        transactionId: z.string(),
        amount: z.number(),
      })))
      .apply(sagaSignal(ProcessOrderSaga, 'payment-confirmed'))
      .handle(({ signalResult }) => signalResult),
  }),
});

Stored Saga Structure

The StoredSaga interface defines what gets persisted:

stored-saga.tsTypeScript
interface StoredSaga<TBody = unknown, TState = unknown> {
  // Identifiers
  workflowId: string;      // Unique execution ID
  sagaPath: string;        // Saga path (e.g., 'orders/process')
  correlationId?: string;  // Business correlation (e.g., order ID)

  // Execution state
  status: 'pending' | 'running' | 'suspended' | 'completed' | 'failed';
  body: TBody;             // Original input
  params: Record<string, string>;  // Path parameters
  state?: TState;          // User-defined state
  output?: unknown;        // Final output (if completed)
  error?: string;          // Error message (if failed)

  // Suspension info
  waitCondition?: {
    event?: string;        // Signal being waited for
    resumeAt?: Date;       // When to resume (for sleep)
  };

  // Execution history
  steps: StepRecord[];     // Completed steps with cached results
  compensations: string[]; // Registered compensation step IDs

  // Timestamps
  createdAt: Date;
  updatedAt: Date;
}

Custom Storage Backend

Implement the SagaStorage abstract class for custom backends:

redis-storage.tsTypeScript
import { SagaStorage, type StoredSaga, type SagaStorageOptions } from '@justscale/saga';

export class RedisSagaStorage extends SagaStorage {
  constructor(private redis: Redis) {
    super();
  }

  async save(saga: StoredSaga): Promise<void> {
    await this.redis.hset(
      `saga:${saga.workflowId}`,
      'data',
      JSON.stringify(saga)
    );

    // Index by correlation ID
    if (saga.correlationId) {
      await this.redis.set(
        `saga:corr:${saga.correlationId}`,
        saga.workflowId
      );
    }

    // Index suspended sagas for polling
    if (saga.status === 'suspended' && saga.waitCondition?.resumeAt) {
      await this.redis.zadd(
        'saga:sleeping',
        saga.waitCondition.resumeAt.getTime(),
        saga.workflowId
      );
    }
  }

  async load(workflowId: string): Promise<StoredSaga | undefined> {
    const data = await this.redis.hget(`saga:${workflowId}`, 'data');
    return data ? JSON.parse(data) : undefined;
  }

  async delete(workflowId: string): Promise<void> {
    await this.redis.del(`saga:${workflowId}`);
  }

  async findWaitingFor(event: string, correlationId?: string): Promise<StoredSaga[]> {
    // Scan for suspended sagas waiting for this event
    const keys = await this.redis.keys('saga:*');
    const results: StoredSaga[] = [];

    for (const key of keys) {
      const data = await this.redis.hget(key, 'data');
      if (!data) continue;

      const saga = JSON.parse(data) as StoredSaga;
      if (saga.status !== 'suspended') continue;
      if (saga.waitCondition?.event !== event) continue;
      if (correlationId && saga.correlationId !== correlationId) continue;

      results.push(saga);
    }

    return results;
  }

  async findTimedOut(now: Date): Promise<StoredSaga[]> {
    const ids = await this.redis.zrangebyscore(
      'saga:sleeping',
      0,
      now.getTime()
    );

    const sagas = await Promise.all(
      ids.map((id) => this.load(id))
    );

    return sagas.filter(Boolean) as StoredSaga[];
  }

  async list(options?: SagaStorageOptions): Promise<StoredSaga[]> {
    // Implementation depends on your indexing strategy
    // ...
  }

  async count(options?: SagaStorageOptions): Promise<number> {
    const list = await this.list(options);
    return list.length;
  }
}

Resuming Sleeping Sagas

Sleeping sagas have a resumeAt timestamp. Poll for and resume them with a background worker:

worker.tsTypeScript
import { executeSaga, SagaStorage } from '@justscale/saga';
import { sagaRegistry } from './sagas';

// Poll for sagas ready to resume
async function processSleepingSagas(
  storage: SagaStorage,
  container: Container
) {
  const now = new Date();

  // Find sagas where sleep has elapsed
  const ready = await storage.findTimedOut(now);

  for (const saga of ready) {
    // Get the saga definition from registry
    const sagaDef = sagaRegistry[saga.sagaPath];
    if (!sagaDef) {
      console.warn(`Unknown saga: ${saga.sagaPath}`);
      continue;
    }

    try {
      // Resume execution
      const result = await executeSaga(sagaDef, container, {
        workflowId: saga.workflowId,
        body: saga.body,
        params: saga.params,
        resumeFrom: saga,  // Pass stored state for resume
      });

      console.log(`Resumed saga ${saga.workflowId}: ${result.status}`);
    } catch (err) {
      console.error(`Failed to resume ${saga.workflowId}:`, err);
    }
  }
}

// Run every minute
setInterval(() => processSleepingSagas(storage, container), 60_000);

Querying Saga State

Query saga executions for monitoring, debugging, or business logic:

queries.tsTypeScript
// Find by workflow ID
const saga = await storage.load('order-123');

// Find all suspended sagas
const suspended = await storage.list({
  status: 'suspended',
});

// Find sagas by path
const orderSagas = await storage.list({
  sagaPath: 'orders/process',
});

// Find sagas waiting for specific signal
const waitingForPayment = await storage.findWaitingFor('payment-confirmed');

// Find by correlation ID (requires custom index)
const orderSaga = suspended.find(s => s.correlationId === 'order-456');

// Get saga progress from state
if (saga) {
  console.log('Status:', saga.status);
  console.log('State:', saga.state);
  console.log('Steps completed:', saga.steps.length);
  console.log('Waiting for:', saga.waitCondition?.event);
}

Best Practices

  • Use correlation IDs - Link sagas to business entities (order ID, user ID) for easy lookup from webhooks and APIs.
  • Idempotent steps - Steps may replay on resume. Ensure side effects are idempotent or rely on step caching to prevent re-execution.
  • Meaningful step IDs - Use descriptive IDs like charge-paymentnot step-1. Include context in loops: `process-item-${item.id}`.
  • Poll for sleeping sagas - Run a background worker to resume sagas where sleep has elapsed.
  • Monitor saga state - Build dashboards to track pending, waiting, failed sagas for operational visibility.
  • Use typed signals - Declare signals in createSaga for compile-time safety and runtime validation.