Saga Persistence
Store and resume durable workflows with pluggable storage
Sagas need persistent storage to survive restarts and resume after waiting for external events. JustScale provides a storage abstraction with in-memory and PostgreSQL implementations, plus the ability to create custom backends.
Storage Abstraction
The SagaStorage abstract class defines the storage interface. Bind an implementation via DI to enable persistence:
import { createFeatureBuilder, bindService } from '@justscale/core';
import {
SagaStorage,
InMemorySagaStorage,
SagaStorageService,
} from '@justscale/saga';
// For development/testing - use in-memory storage
export const DevFeature = createFeatureBuilder()
.name('dev')
.provides((b) => b
.add(bindService(SagaStorage, InMemorySagaStorage))
.add(SagaStorageService)
.add(OrderSaga)
);
// For production - use PostgreSQL storage
import { PostgresSagaStorage } from '@justscale/postgres';
export const ProdFeature = createFeatureBuilder()
.name('prod')
.provides((b) => b
.add(bindService(SagaStorage, PostgresSagaStorage))
.add(SagaStorageService)
.add(OrderSaga)
);In-Memory Storage
Use InMemorySagaStorage for development and testing. Data is lost on restart - not suitable for production:
import {
InMemorySagaStorage,
createInMemorySagaStorage,
} from '@justscale/saga';
// Option 1: Use as a class (for DI binding)
const storage = new InMemorySagaStorage();
// Option 2: Use factory function
const storage = createInMemorySagaStorage();
// Storage methods
await storage.save(saga);
const saga = await storage.load(workflowId);
await storage.delete(workflowId);
// Query waiting sagas
const waiting = await storage.findWaitingFor('payment-confirmed');
const timedOut = await storage.findTimedOut(new Date());
// List and count
const all = await storage.list({ status: 'suspended' });
const count = await storage.count({ sagaPath: 'orders/process' });
// For testing - clear all data
storage.clear();Executing Sagas with Storage
Use executeSaga and signalSaga with a container that has storage bound:
import { executeSaga, signalSaga } from '@justscale/saga';
import { ProcessOrderSaga } from './sagas';
// Execute a saga (starts or resumes)
const result = await executeSaga(ProcessOrderSaga, container, {
workflowId: 'order-123',
body: {
items: [{ productId: 'prod-1', quantity: 2 }],
},
params: { orderId: 'order-123' },
});
if (result.status === 'suspended') {
console.log('Saga waiting for:', result.waitCondition?.event);
// Store workflowId for later signal
}
if (result.status === 'completed') {
console.log('Saga completed:', result.output);
}
// Later, send a signal to resume
await signalSaga(
ProcessOrderSaga,
container,
storage, // SagaStorage instance
'order-123',
'payment-confirmed',
{ transactionId: 'txn-456', amount: 99.99 }
);HTTP Integration
The sagaStart and sagaSignal plugins handle HTTP-to-saga integration automatically:
import { createController, Post } from '@justscale/core';
import { body } from '@justscale/http';
import { sagaStart, sagaSignal } from '@justscale/saga';
import { ProcessOrderSaga } from './sagas';
export const OrderController = createController('/orders', {
routes: (_, { Post }) => ({
// Start a new order saga
create: Post('/')
.apply(body(z.object({
items: z.array(ItemSchema),
})))
.apply(sagaStart(ProcessOrderSaga))
.handle(({ sagaResult }) => {
// sagaResult has workflowId, status, output/error
if (sagaResult.status === 'suspended') {
return {
workflowId: sagaResult.workflowId,
status: 'pending',
waitingFor: sagaResult.waitCondition?.event,
};
}
return {
workflowId: sagaResult.workflowId,
status: 'completed',
result: sagaResult.output,
};
}),
// Confirm payment (sends signal)
confirmPayment: Post('/:workflowId/confirm-payment')
.apply(body(z.object({
transactionId: z.string(),
amount: z.number(),
})))
.apply(sagaSignal(ProcessOrderSaga, 'payment-confirmed'))
.handle(({ signalResult }) => signalResult),
}),
});Stored Saga Structure
The StoredSaga interface defines what gets persisted:
interface StoredSaga<TBody = unknown, TState = unknown> {
// Identifiers
workflowId: string; // Unique execution ID
sagaPath: string; // Saga path (e.g., 'orders/process')
correlationId?: string; // Business correlation (e.g., order ID)
// Execution state
status: 'pending' | 'running' | 'suspended' | 'completed' | 'failed';
body: TBody; // Original input
params: Record<string, string>; // Path parameters
state?: TState; // User-defined state
output?: unknown; // Final output (if completed)
error?: string; // Error message (if failed)
// Suspension info
waitCondition?: {
event?: string; // Signal being waited for
resumeAt?: Date; // When to resume (for sleep)
};
// Execution history
steps: StepRecord[]; // Completed steps with cached results
compensations: string[]; // Registered compensation step IDs
// Timestamps
createdAt: Date;
updatedAt: Date;
}Custom Storage Backend
Implement the SagaStorage abstract class for custom backends:
import { SagaStorage, type StoredSaga, type SagaStorageOptions } from '@justscale/saga';
export class RedisSagaStorage extends SagaStorage {
constructor(private redis: Redis) {
super();
}
async save(saga: StoredSaga): Promise<void> {
await this.redis.hset(
`saga:${saga.workflowId}`,
'data',
JSON.stringify(saga)
);
// Index by correlation ID
if (saga.correlationId) {
await this.redis.set(
`saga:corr:${saga.correlationId}`,
saga.workflowId
);
}
// Index suspended sagas for polling
if (saga.status === 'suspended' && saga.waitCondition?.resumeAt) {
await this.redis.zadd(
'saga:sleeping',
saga.waitCondition.resumeAt.getTime(),
saga.workflowId
);
}
}
async load(workflowId: string): Promise<StoredSaga | undefined> {
const data = await this.redis.hget(`saga:${workflowId}`, 'data');
return data ? JSON.parse(data) : undefined;
}
async delete(workflowId: string): Promise<void> {
await this.redis.del(`saga:${workflowId}`);
}
async findWaitingFor(event: string, correlationId?: string): Promise<StoredSaga[]> {
// Scan for suspended sagas waiting for this event
const keys = await this.redis.keys('saga:*');
const results: StoredSaga[] = [];
for (const key of keys) {
const data = await this.redis.hget(key, 'data');
if (!data) continue;
const saga = JSON.parse(data) as StoredSaga;
if (saga.status !== 'suspended') continue;
if (saga.waitCondition?.event !== event) continue;
if (correlationId && saga.correlationId !== correlationId) continue;
results.push(saga);
}
return results;
}
async findTimedOut(now: Date): Promise<StoredSaga[]> {
const ids = await this.redis.zrangebyscore(
'saga:sleeping',
0,
now.getTime()
);
const sagas = await Promise.all(
ids.map((id) => this.load(id))
);
return sagas.filter(Boolean) as StoredSaga[];
}
async list(options?: SagaStorageOptions): Promise<StoredSaga[]> {
// Implementation depends on your indexing strategy
// ...
}
async count(options?: SagaStorageOptions): Promise<number> {
const list = await this.list(options);
return list.length;
}
}Resuming Sleeping Sagas
Sleeping sagas have a resumeAt timestamp. Poll for and resume them with a background worker:
import { executeSaga, SagaStorage } from '@justscale/saga';
import { sagaRegistry } from './sagas';
// Poll for sagas ready to resume
async function processSleepingSagas(
storage: SagaStorage,
container: Container
) {
const now = new Date();
// Find sagas where sleep has elapsed
const ready = await storage.findTimedOut(now);
for (const saga of ready) {
// Get the saga definition from registry
const sagaDef = sagaRegistry[saga.sagaPath];
if (!sagaDef) {
console.warn(`Unknown saga: ${saga.sagaPath}`);
continue;
}
try {
// Resume execution
const result = await executeSaga(sagaDef, container, {
workflowId: saga.workflowId,
body: saga.body,
params: saga.params,
resumeFrom: saga, // Pass stored state for resume
});
console.log(`Resumed saga ${saga.workflowId}: ${result.status}`);
} catch (err) {
console.error(`Failed to resume ${saga.workflowId}:`, err);
}
}
}
// Run every minute
setInterval(() => processSleepingSagas(storage, container), 60_000);Querying Saga State
Query saga executions for monitoring, debugging, or business logic:
// Find by workflow ID
const saga = await storage.load('order-123');
// Find all suspended sagas
const suspended = await storage.list({
status: 'suspended',
});
// Find sagas by path
const orderSagas = await storage.list({
sagaPath: 'orders/process',
});
// Find sagas waiting for specific signal
const waitingForPayment = await storage.findWaitingFor('payment-confirmed');
// Find by correlation ID (requires custom index)
const orderSaga = suspended.find(s => s.correlationId === 'order-456');
// Get saga progress from state
if (saga) {
console.log('Status:', saga.status);
console.log('State:', saga.state);
console.log('Steps completed:', saga.steps.length);
console.log('Waiting for:', saga.waitCondition?.event);
}Best Practices
- Use correlation IDs - Link sagas to business entities (order ID, user ID) for easy lookup from webhooks and APIs.
- Idempotent steps - Steps may replay on resume. Ensure side effects are idempotent or rely on step caching to prevent re-execution.
- Meaningful step IDs - Use descriptive IDs like
charge-paymentnotstep-1. Include context in loops:`process-item-${item.id}`. - Poll for sleeping sagas - Run a background worker to resume sagas where
sleephas elapsed. - Monitor saga state - Build dashboards to track pending, waiting, failed sagas for operational visibility.
- Use typed signals - Declare signals in
createSagafor compile-time safety and runtime validation.