Saga Operations
Complete reference for all saga operations
Saga operations are awaited from async handlers to perform durable actions. Each operation is tracked and cached, enabling reliable replay on resume.
step
Execute a function and cache its result. On replay, returns the cached value without re-executing. Use for side effects like database writes or API calls.
// Signature
async function step<T>(
id: string,
fn: () => T | Promise<T>
): Promise<T>
// Examples
handler: () => async (ctx, { step }) => {
const user = await step('create-user', async () => {
return await db.users.create({ name: 'Alice' });
});
const apiResult = await step('call-api', async () => {
const res = await fetch('https://api.example.com/data');
return res.json();
});
// Sync functions work too
const computed = await step('compute', () => {
return heavyComputation(data);
});
return { user, apiResult, computed };
}Warning
`process-item-${item.id}` for loops.waitFor
Suspend the saga until an external signal arrives. The saga persists its state and resumes when the signal is sent. With createSaga, signals are typed based on the signals: declaration.
// Signature (typed based on declared signals)
async function waitFor<K extends keyof TSignals>(
event: K
): Promise<TSignals[K]>
// Example with typed signals
const PaymentSaga = createSaga({
signals: {
'payment-confirmed': z.object({
transactionId: z.string(),
amount: z.number(),
}),
'payment-failed': z.object({ reason: z.string() }),
},
handler: () => async (ctx, { waitFor }) => {
// Typed! Only accepts declared signal names
const payment = await waitFor('payment-confirmed');
// ^? { transactionId: string, amount: number }
// TypeScript error:
// await waitFor('undeclared'); // Error!
return { txId: payment.transactionId };
},
});Sending Signals
External systems send signals to resume waiting sagas using signalSaga:
import { signalSaga } from '@justscale/saga';
// From a webhook handler, API endpoint, etc.
await signalSaga(
PaymentSaga, // The saga definition
container, // DI container
storage, // Saga storage (has the suspended saga state)
workflowId, // The saga execution ID
'payment-confirmed', // Signal name (validated against declared signals)
{ // Payload (validated against Zod schema)
transactionId: 'txn_123',
amount: 99.99,
}
);
// Or via HTTP with sagaSignal plugin
Post('/orders/:workflowId/confirm')
.apply(body(z.object({ transactionId: z.string(), amount: z.number() })))
.apply(sagaSignal(PaymentSaga))
.handle(({ signalResult }) => signalResult)sleep
Suspend the saga for a specified duration. The saga persists and automatically resumes after the time elapses.
// Signature
async function sleep(durationMs: number): Promise<void>
// Examples
import { seconds, minutes, hours, days } from '@justscale/saga';
handler: () => async (ctx, { step, sleep }) => {
await step('send-welcome', () => sendEmail(ctx.body.email));
// Wait 30 seconds
await sleep(seconds(30));
// Wait 5 minutes
await sleep(minutes(5));
// Wait 24 hours before sending reminder
await sleep(hours(24));
await step('send-reminder', () => sendReminder(ctx.body.email));
// Wait 7 days for trial expiration check
await sleep(days(7));
return { completed: true };
}compensate
Register a compensating action for a completed step. If the saga fails later, compensations run in reverse order to undo completed work.
// Signature
function compensate(
stepId: string,
fn: () => void | Promise<void>
): void
// Examples
handler: ({ inventory, payments }) =>
async (ctx, { step, compensate }) => {
// Reserve inventory with compensation
const reservation = await step('reserve-inventory', async () => {
return inventory.reserve(ctx.body.items);
});
compensate('reserve-inventory', async () => {
await inventory.release(reservation.id);
});
// Charge payment with compensation
const charge = await step('charge-card', async () => {
return payments.charge({ amount: ctx.body.total });
});
compensate('charge-card', async () => {
await payments.refund(charge.id);
});
// If this fails, compensations run in reverse:
// 1. Refund the charge
// 2. Release inventory reservation
await step('create-shipment', async () => {
return shipping.create(reservation.id);
});
return { success: true };
}Info
race
Wait for the first of multiple operations to complete. Useful for timeouts, cancellation, or competing events.
// Signature
async function race<T extends Record<string, () => Promise<unknown>>>(
branches: T
): Promise<{ [K in keyof T]?: Awaited<ReturnType<T[K]>> }>
// Examples
handler: () => async (ctx, { waitFor, sleep, race }) => {
// Race between payment and timeout
const result = await race({
payment: () => waitFor('payment-received'),
timeout: () => sleep(minutes(30)),
});
if (result.timeout !== undefined) {
throw new Error('Payment timeout - order cancelled');
}
// Payment arrived first
const payment = result.payment!;
// Race between multiple events
const event = await race({
approve: () => waitFor('approved'),
reject: () => waitFor('rejected'),
cancel: () => waitFor('cancelled'),
expire: () => sleep(days(7)),
});
if (event.approve) {
return { status: 'approved', by: event.approve.by };
} else if (event.reject) {
throw new Error(`Rejected: ${event.reject.reason}`);
} else {
throw new Error('Cancelled or expired');
}
}parallel
Execute multiple operations in parallel and wait for all to complete. Useful for independent operations that can run concurrently.
// Signature
async function parallel<T extends (() => Promise<unknown>)[]>(
operations: T
): Promise<{ [K in keyof T]: Awaited<ReturnType<T[K]>> }>
// Examples
handler: ({ userService, orderService, prefService }) =>
async (ctx, { step, parallel }) => {
// Parallel API calls
const [user, orders, preferences] = await parallel([
() => step('get-user', () => userService.findById(ctx.params.userId)),
() => step('get-orders', () => orderService.findByUser(ctx.params.userId)),
() => step('get-prefs', () => prefService.findByUser(ctx.params.userId)),
]);
// Parallel checks
const [inventory, pricing, shipping] = await parallel([
() => step('check-inventory', () => inventoryService.check(items)),
() => step('calculate-price', () => pricingService.calculate(items)),
() => step('estimate-shipping', () => shippingService.estimate(address)),
]);
if (!inventory.available) {
throw new Error('Items out of stock');
}
return { total: pricing.subtotal + shipping.cost };
}call
Call another saga as a sub-workflow. The child saga runs to completion (or suspension) and its result is returned.
// Signature
async function call<TSaga extends SagaDef>(
saga: TSaga,
input: SagaInput<TSaga>
): Promise<SagaOutput<TSaga>>
// Example: Compose sagas
const ShippingSaga = createSaga({
path: 'shipping/create',
body: z.object({ orderId: z.string(), items: z.array(ItemSchema) }),
handler: () => async (ctx, ops) => {
// ... shipping logic
return { trackingNumber: 'TRACK-123' };
},
});
const OrderSaga = createSaga({
path: 'orders/process',
inject: {
shippingSaga: ShippingSaga, // Inject child saga
},
handler: ({ shippingSaga }) =>
async (ctx, { step, call }) => {
// ... payment logic
// Call child saga
const shipping = await call(shippingSaga, {
orderId: ctx.params.orderId,
items: ctx.body.items,
});
return {
orderId: ctx.params.orderId,
trackingNumber: shipping.trackingNumber,
};
},
});condition
Capture a non-deterministic condition for consistent replay. Use for time-based or random conditions that must remain constant across executions.
// Signature
async function condition(
id: string,
fn: () => boolean
): Promise<boolean>
// Examples
handler: () => async (ctx, { step, condition }) => {
// Capture time-based condition
const isBusinessHours = await condition('is-business-hours', () => {
const hour = new Date().getHours();
return hour >= 9 && hour < 17;
});
if (isBusinessHours) {
await step('process-immediately', () => processOrder(ctx.body));
} else {
await step('queue-for-morning', () => queueOrder(ctx.body));
}
// Random routing (captured for replay)
const useNewSystem = await condition('use-new-system', () => {
return Math.random() < 0.1; // 10% rollout
});
return { usedNewSystem: useNewSystem };
}each
Iterate over items with saga semantics. Each iteration is tracked, allowing resume from the last completed item.
// Signature
async function each<T, R>(
id: string,
items: T[],
fn: (item: T, index: number) => Promise<R>
): Promise<R[]>
// Examples
handler: () => async (ctx, { step, each, waitFor }) => {
const items = await step('get-items', () => fetchItems());
// Process each item with saga tracking
const results = await each('process-items', items, async (item, index) => {
// Each iteration is a tracked step
return await processItem(item);
});
// Can include waits in iteration
await each('notify-users', users, async (user) => {
await sendNotification(user);
await waitFor(`user-${user.id}-acknowledged`);
});
return { processed: results.length };
}getState / setState
Read and update saga state. State changes are persisted and survive restarts.
// Signatures
function getState(): TState
function setState(updater: (state: TState) => TState | void): void
// Example with state schema
const OrderSaga = createSaga({
path: 'orders/process',
state: z.object({
progress: z.number(),
processedItems: z.array(z.string()),
lastCheckpoint: z.string().optional(),
}),
initialState: () => ({
progress: 0,
processedItems: [],
}),
handler: () => async (ctx, { step, getState, setState }) => {
const items = ctx.body.items;
for (const item of items) {
await step(`process-${item.id}`, () => processItem(item));
// Update state after each item
setState((state) => {
state.processedItems.push(item.id);
state.progress = (state.processedItems.length / items.length) * 100;
state.lastCheckpoint = item.id;
});
}
const finalState = getState();
return {
processed: finalState.processedItems.length,
progress: finalState.progress,
};
},
});retry
Retry an operation with configurable attempts and backoff:
// Signature
async function retry<T>(
id: string,
fn: () => Promise<T>,
options?: {
maxAttempts?: number;
backoff?: 'fixed' | 'exponential';
delayMs?: number;
}
): Promise<T>
// Examples
handler: () => async (ctx, { retry }) => {
// Retry up to 3 times with exponential backoff
const result = await retry(
'call-flaky-api',
() => flakyApiCall(ctx.body),
{
maxAttempts: 3,
backoff: 'exponential',
delayMs: 1000, // 1s, 2s, 4s
}
);
// Fixed delay retry
const data = await retry(
'fetch-data',
() => fetchData(),
{
maxAttempts: 5,
backoff: 'fixed',
delayMs: 500, // 500ms between each attempt
}
);
return { result, data };
}runCompensations
Manually trigger compensating actions. Usually called automatically on failure, but can be invoked explicitly:
// Signature
async function runCompensations(stepIds?: string[]): Promise<void>
// Example: Manual rollback on business condition
handler: ({ payments, inventory }) =>
async (ctx, { step, compensate, runCompensations }) => {
const reservation = await step('reserve', () => inventory.reserve(items));
compensate('reserve', () => inventory.release(reservation.id));
const charge = await step('charge', () => payments.charge(amount));
compensate('charge', () => payments.refund(charge.id));
// Business validation after steps
const fraudCheck = await step('fraud-check', () => checkFraud(charge));
if (fraudCheck.suspicious) {
// Manually run all compensations
await runCompensations();
throw new Error('Order flagged for fraud review');
}
// Or run specific compensations
if (fraudCheck.needsReview) {
await runCompensations(['charge']); // Only refund, keep reservation
}
return { success: true };
}Operation Summary
Quick reference for all saga operations:
step(id, fn)- Execute and cache a function resultwaitFor(event)- Suspend until external signal (typed)sleep(ms)- Suspend for durationcompensate(stepId, fn)- Register rollback actionrace(branches)- Wait for first of multiple operationsparallel(operations)- Execute operations concurrentlycall(saga, input)- Call child sagacondition(id, fn)- Capture non-deterministic conditioneach(id, items, fn)- Iterate with saga trackinggetState()/setState(fn)- Read/update saga stateretry(id, fn, opts)- Retry with backoffrunCompensations()- Trigger rollback manually