Saga Operations

Complete reference for all saga operations

Saga operations are awaited from async handlers to perform durable actions. Each operation is tracked and cached, enabling reliable replay on resume.

step

Execute a function and cache its result. On replay, returns the cached value without re-executing. Use for side effects like database writes or API calls.

step.tsTypeScript
// Signature
async function step<T>(
  id: string,
  fn: () => T | Promise<T>
): Promise<T>

// Examples
handler: () => async (ctx, { step }) => {
  const user = await step('create-user', async () => {
    return await db.users.create({ name: 'Alice' });
  });

  const apiResult = await step('call-api', async () => {
    const res = await fetch('https://api.example.com/data');
    return res.json();
  });

  // Sync functions work too
  const computed = await step('compute', () => {
    return heavyComputation(data);
  });

  return { user, apiResult, computed };
}
⚠️

Warning

Step IDs must be unique within a saga execution. Use descriptive names or include context like `process-item-${item.id}` for loops.

waitFor

Suspend the saga until an external signal arrives. The saga persists its state and resumes when the signal is sent. With createSaga, signals are typed based on the signals: declaration.

wait-for.tsTypeScript
// Signature (typed based on declared signals)
async function waitFor<K extends keyof TSignals>(
  event: K
): Promise<TSignals[K]>

// Example with typed signals
const PaymentSaga = createSaga({
  signals: {
    'payment-confirmed': z.object({
      transactionId: z.string(),
      amount: z.number(),
    }),
    'payment-failed': z.object({ reason: z.string() }),
  },

  handler: () => async (ctx, { waitFor }) => {
    // Typed! Only accepts declared signal names
    const payment = await waitFor('payment-confirmed');
    //    ^? { transactionId: string, amount: number }

    // TypeScript error:
    // await waitFor('undeclared'); // Error!

    return { txId: payment.transactionId };
  },
});

Sending Signals

External systems send signals to resume waiting sagas using signalSaga:

send-signal.tsTypeScript
import { signalSaga } from '@justscale/saga';

// From a webhook handler, API endpoint, etc.
await signalSaga(
  PaymentSaga,    // The saga definition
  container,      // DI container
  storage,        // Saga storage (has the suspended saga state)
  workflowId,     // The saga execution ID
  'payment-confirmed',  // Signal name (validated against declared signals)
  {                     // Payload (validated against Zod schema)
    transactionId: 'txn_123',
    amount: 99.99,
  }
);

// Or via HTTP with sagaSignal plugin
Post('/orders/:workflowId/confirm')
  .apply(body(z.object({ transactionId: z.string(), amount: z.number() })))
  .apply(sagaSignal(PaymentSaga))
  .handle(({ signalResult }) => signalResult)

sleep

Suspend the saga for a specified duration. The saga persists and automatically resumes after the time elapses.

sleep.tsTypeScript
// Signature
async function sleep(durationMs: number): Promise<void>

// Examples
import { seconds, minutes, hours, days } from '@justscale/saga';

handler: () => async (ctx, { step, sleep }) => {
  await step('send-welcome', () => sendEmail(ctx.body.email));

  // Wait 30 seconds
  await sleep(seconds(30));

  // Wait 5 minutes
  await sleep(minutes(5));

  // Wait 24 hours before sending reminder
  await sleep(hours(24));

  await step('send-reminder', () => sendReminder(ctx.body.email));

  // Wait 7 days for trial expiration check
  await sleep(days(7));

  return { completed: true };
}

compensate

Register a compensating action for a completed step. If the saga fails later, compensations run in reverse order to undo completed work.

compensate.tsTypeScript
// Signature
function compensate(
  stepId: string,
  fn: () => void | Promise<void>
): void

// Examples
handler: ({ inventory, payments }) =>
  async (ctx, { step, compensate }) => {
    // Reserve inventory with compensation
    const reservation = await step('reserve-inventory', async () => {
      return inventory.reserve(ctx.body.items);
    });

    compensate('reserve-inventory', async () => {
      await inventory.release(reservation.id);
    });

    // Charge payment with compensation
    const charge = await step('charge-card', async () => {
      return payments.charge({ amount: ctx.body.total });
    });

    compensate('charge-card', async () => {
      await payments.refund(charge.id);
    });

    // If this fails, compensations run in reverse:
    // 1. Refund the charge
    // 2. Release inventory reservation
    await step('create-shipment', async () => {
      return shipping.create(reservation.id);
    });

    return { success: true };
  }
ℹ️

Info

Compensations are for business-level rollbacks, not transaction rollbacks. They handle cases like "refund payment if shipping fails" rather than database-level atomicity.

race

Wait for the first of multiple operations to complete. Useful for timeouts, cancellation, or competing events.

race.tsTypeScript
// Signature
async function race<T extends Record<string, () => Promise<unknown>>>(
  branches: T
): Promise<{ [K in keyof T]?: Awaited<ReturnType<T[K]>> }>

// Examples
handler: () => async (ctx, { waitFor, sleep, race }) => {
  // Race between payment and timeout
  const result = await race({
    payment: () => waitFor('payment-received'),
    timeout: () => sleep(minutes(30)),
  });

  if (result.timeout !== undefined) {
    throw new Error('Payment timeout - order cancelled');
  }

  // Payment arrived first
  const payment = result.payment!;

  // Race between multiple events
  const event = await race({
    approve: () => waitFor('approved'),
    reject: () => waitFor('rejected'),
    cancel: () => waitFor('cancelled'),
    expire: () => sleep(days(7)),
  });

  if (event.approve) {
    return { status: 'approved', by: event.approve.by };
  } else if (event.reject) {
    throw new Error(`Rejected: ${event.reject.reason}`);
  } else {
    throw new Error('Cancelled or expired');
  }
}

parallel

Execute multiple operations in parallel and wait for all to complete. Useful for independent operations that can run concurrently.

parallel.tsTypeScript
// Signature
async function parallel<T extends (() => Promise<unknown>)[]>(
  operations: T
): Promise<{ [K in keyof T]: Awaited<ReturnType<T[K]>> }>

// Examples
handler: ({ userService, orderService, prefService }) =>
  async (ctx, { step, parallel }) => {
    // Parallel API calls
    const [user, orders, preferences] = await parallel([
      () => step('get-user', () => userService.findById(ctx.params.userId)),
      () => step('get-orders', () => orderService.findByUser(ctx.params.userId)),
      () => step('get-prefs', () => prefService.findByUser(ctx.params.userId)),
    ]);

    // Parallel checks
    const [inventory, pricing, shipping] = await parallel([
      () => step('check-inventory', () => inventoryService.check(items)),
      () => step('calculate-price', () => pricingService.calculate(items)),
      () => step('estimate-shipping', () => shippingService.estimate(address)),
    ]);

    if (!inventory.available) {
      throw new Error('Items out of stock');
    }

    return { total: pricing.subtotal + shipping.cost };
  }

call

Call another saga as a sub-workflow. The child saga runs to completion (or suspension) and its result is returned.

call.tsTypeScript
// Signature
async function call<TSaga extends SagaDef>(
  saga: TSaga,
  input: SagaInput<TSaga>
): Promise<SagaOutput<TSaga>>

// Example: Compose sagas
const ShippingSaga = createSaga({
  path: 'shipping/create',
  body: z.object({ orderId: z.string(), items: z.array(ItemSchema) }),
  handler: () => async (ctx, ops) => {
    // ... shipping logic
    return { trackingNumber: 'TRACK-123' };
  },
});

const OrderSaga = createSaga({
  path: 'orders/process',
  inject: {
    shippingSaga: ShippingSaga,  // Inject child saga
  },
  handler: ({ shippingSaga }) =>
    async (ctx, { step, call }) => {
      // ... payment logic

      // Call child saga
      const shipping = await call(shippingSaga, {
        orderId: ctx.params.orderId,
        items: ctx.body.items,
      });

      return {
        orderId: ctx.params.orderId,
        trackingNumber: shipping.trackingNumber,
      };
    },
});

condition

Capture a non-deterministic condition for consistent replay. Use for time-based or random conditions that must remain constant across executions.

condition.tsTypeScript
// Signature
async function condition(
  id: string,
  fn: () => boolean
): Promise<boolean>

// Examples
handler: () => async (ctx, { step, condition }) => {
  // Capture time-based condition
  const isBusinessHours = await condition('is-business-hours', () => {
    const hour = new Date().getHours();
    return hour >= 9 && hour < 17;
  });

  if (isBusinessHours) {
    await step('process-immediately', () => processOrder(ctx.body));
  } else {
    await step('queue-for-morning', () => queueOrder(ctx.body));
  }

  // Random routing (captured for replay)
  const useNewSystem = await condition('use-new-system', () => {
    return Math.random() < 0.1; // 10% rollout
  });

  return { usedNewSystem: useNewSystem };
}

each

Iterate over items with saga semantics. Each iteration is tracked, allowing resume from the last completed item.

each.tsTypeScript
// Signature
async function each<T, R>(
  id: string,
  items: T[],
  fn: (item: T, index: number) => Promise<R>
): Promise<R[]>

// Examples
handler: () => async (ctx, { step, each, waitFor }) => {
  const items = await step('get-items', () => fetchItems());

  // Process each item with saga tracking
  const results = await each('process-items', items, async (item, index) => {
    // Each iteration is a tracked step
    return await processItem(item);
  });

  // Can include waits in iteration
  await each('notify-users', users, async (user) => {
    await sendNotification(user);
    await waitFor(`user-${user.id}-acknowledged`);
  });

  return { processed: results.length };
}

getState / setState

Read and update saga state. State changes are persisted and survive restarts.

state.tsTypeScript
// Signatures
function getState(): TState
function setState(updater: (state: TState) => TState | void): void

// Example with state schema
const OrderSaga = createSaga({
  path: 'orders/process',

  state: z.object({
    progress: z.number(),
    processedItems: z.array(z.string()),
    lastCheckpoint: z.string().optional(),
  }),

  initialState: () => ({
    progress: 0,
    processedItems: [],
  }),

  handler: () => async (ctx, { step, getState, setState }) => {
    const items = ctx.body.items;

    for (const item of items) {
      await step(`process-${item.id}`, () => processItem(item));

      // Update state after each item
      setState((state) => {
        state.processedItems.push(item.id);
        state.progress = (state.processedItems.length / items.length) * 100;
        state.lastCheckpoint = item.id;
      });
    }

    const finalState = getState();
    return {
      processed: finalState.processedItems.length,
      progress: finalState.progress,
    };
  },
});

retry

Retry an operation with configurable attempts and backoff:

retry.tsTypeScript
// Signature
async function retry<T>(
  id: string,
  fn: () => Promise<T>,
  options?: {
    maxAttempts?: number;
    backoff?: 'fixed' | 'exponential';
    delayMs?: number;
  }
): Promise<T>

// Examples
handler: () => async (ctx, { retry }) => {
  // Retry up to 3 times with exponential backoff
  const result = await retry(
    'call-flaky-api',
    () => flakyApiCall(ctx.body),
    {
      maxAttempts: 3,
      backoff: 'exponential',
      delayMs: 1000, // 1s, 2s, 4s
    }
  );

  // Fixed delay retry
  const data = await retry(
    'fetch-data',
    () => fetchData(),
    {
      maxAttempts: 5,
      backoff: 'fixed',
      delayMs: 500, // 500ms between each attempt
    }
  );

  return { result, data };
}

runCompensations

Manually trigger compensating actions. Usually called automatically on failure, but can be invoked explicitly:

run-compensations.tsTypeScript
// Signature
async function runCompensations(stepIds?: string[]): Promise<void>

// Example: Manual rollback on business condition
handler: ({ payments, inventory }) =>
  async (ctx, { step, compensate, runCompensations }) => {
    const reservation = await step('reserve', () => inventory.reserve(items));
    compensate('reserve', () => inventory.release(reservation.id));

    const charge = await step('charge', () => payments.charge(amount));
    compensate('charge', () => payments.refund(charge.id));

    // Business validation after steps
    const fraudCheck = await step('fraud-check', () => checkFraud(charge));

    if (fraudCheck.suspicious) {
      // Manually run all compensations
      await runCompensations();
      throw new Error('Order flagged for fraud review');
    }

    // Or run specific compensations
    if (fraudCheck.needsReview) {
      await runCompensations(['charge']); // Only refund, keep reservation
    }

    return { success: true };
  }

Operation Summary

Quick reference for all saga operations:

  • step(id, fn) - Execute and cache a function result
  • waitFor(event) - Suspend until external signal (typed)
  • sleep(ms) - Suspend for duration
  • compensate(stepId, fn) - Register rollback action
  • race(branches) - Wait for first of multiple operations
  • parallel(operations) - Execute operations concurrently
  • call(saga, input) - Call child saga
  • condition(id, fn) - Capture non-deterministic condition
  • each(id, items, fn) - Iterate with saga tracking
  • getState() / setState(fn) - Read/update saga state
  • retry(id, fn, opts) - Retry with backoff
  • runCompensations() - Trigger rollback manually