Runtime & Testing
Setting up the process executor and testing durable processes
Durable processes need a runtime to execute. The runtime consists of three pluggable components: storage (persists state), signal bus (routes signals), and timer scheduler (manages durable timers).
In-Memory Runtime
For development and testing, use the in-memory runtime. State is lost on restart, but it's fast and requires no external dependencies:
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/process'
import { container } from './container'
// Create runtime with all components
const runtime = createInMemoryRuntime({ container })
// Set as global executor (required for signal emission)
setProcessExecutor(runtime.executor)
// Now processes can be started and signals emitted
import { OrderFulfillment } from './processes'
const handle = await OrderFulfillment(['order-123'])
console.log(handle.status) // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'Production Runtime
For production, plug in persistent backends. JustScale provides PostgreSQL adapters:
import { createProcessExecutor, setProcessExecutor } from '@justscale/process'
import { PgProcessStorage, PgSignalBus } from '@justscale/postgres'
import { ScheduledTaskTimerScheduler } from '@justscale/process'
import { container } from './container'
// PostgreSQL-backed runtime
const executor = createProcessExecutor({
container,
storage: new PgProcessStorage(pgClient),
signalBus: new PgSignalBus(pubsub),
timerScheduler: new ScheduledTaskTimerScheduler(scheduledTaskRepo),
})
setProcessExecutor(executor)
// Start processing timers
executor.startTimers()Process Handle
Starting a process returns a handle with status info and a promise for the result:
import { OrderFulfillment } from './processes'
// Start a process
const handle = await OrderFulfillment(['order-123'])
// Handle properties
handle.id // 'order/order-123/fulfillment' (instance ID)
handle.status // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'
// Wait for completion (blocks until process finishes)
const result = await handle.wait()
// result is typed based on your handler's return type
// Check status without blocking
if (handle.status === 'suspended') {
console.log('Waiting for signal...')
}
// Idempotent starts - same params returns existing handle
const handle2 = await OrderFulfillment(['order-123'])
console.log(handle.id === handle2.id) // trueProcess Lifecycle
Processes move through these states:
- pending - Created but not yet executing
- running - Currently executing blocks
- suspended - Waiting for signal or timer
- completed - Finished successfully with result
- failed - Threw an error, captured in state
// pending → running (start executing)
const handle = await MyProcess(['id']) // status: 'pending' → 'running'
// running → suspended (hit signal/delay)
await signal(service.someEvent) // status: 'suspended'
// suspended → running (signal emitted)
await service.someEvent('id', payload) // status: 'running'
// running → completed (handler returns)
return { success: true } // status: 'completed'
// running → failed (handler throws)
throw new Error('Oops') // status: 'failed'Duration Helpers
Use duration helpers with delay() for readable timeouts:
import { delay, seconds, minutes, hours, days } from '@justscale/process'
// All durations are converted to milliseconds internally
await delay(seconds(30)) // 30 seconds
await delay(minutes(5)) // 5 minutes
await delay(hours(24)) // 24 hours
await delay(days(7)) // 7 days
// In a race
const r = race()
switch (true) {
case signal(r, orders.shipped):
return { shipped: true }
case delay(r, days(30)):
return { expired: true, reason: '30-day shipping window exceeded' }
}Testing Processes
The in-memory runtime provides direct access to components for testing:
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/process'
import { TestContainer } from '@justscale/testing'
import { OrderFulfillment } from './processes'
import { OrderService, ShippingService } from './services'
describe('OrderFulfillment', () => {
let runtime: ReturnType<typeof createInMemoryRuntime>
let container: TestContainer
beforeEach(() => {
container = new TestContainer()
container.register(OrderService, mockOrderService)
container.register(ShippingService, mockShippingService)
runtime = createInMemoryRuntime({ container })
setProcessExecutor(runtime.executor)
})
afterEach(() => {
runtime.stop()
runtime.clear()
})
it('completes when all signals received', async () => {
// Start process
const handle = await OrderFulfillment(['order-123'])
assert.strictEqual(handle.status, 'suspended')
// Emit payment signal
await runtime.executor.emit('orders.paymentReceived',
{ orderId: 'order-123' },
{ txId: 'tx-456' }
)
// Check suspended again (waiting for shipped)
const state = await runtime.storage.load(handle.id)
assert.strictEqual(state?.status, 'suspended')
// Emit shipped signal
await runtime.executor.emit('orders.shipped',
{ orderId: 'order-123' },
{ trackingNumber: 'TRACK-789' }
)
// Wait for completion
const result = await handle.wait()
assert.deepStrictEqual(result, {
status: 'completed',
orderId: 'order-123',
})
})
})Testing Timeouts
The in-memory timer scheduler supports time manipulation for testing:
it('expires after timeout', async () => {
const handle = await PaymentProcess(['pay-123'])
assert.strictEqual(handle.status, 'suspended')
// Advance time past the timeout
const futureDate = new Date(Date.now() + 25 * 60 * 60 * 1000) // +25 hours
runtime.timerScheduler.advanceTo(futureDate)
// Process should complete with timeout result
const result = await handle.wait()
assert.strictEqual(result.status, 'timeout')
})Error Handling
When a block throws, the process is marked as failed and the error is captured:
// Process that might fail
export const RiskyProcess = createProcess({
path: '/risky/:id',
inject: { service: RiskyService },
async handler({ service }, [id]) {
// If this throws, process goes to 'failed' status
const result = await service.doRiskyThing(id)
return { success: true, result }
},
})
// In your code
const handle = await RiskyProcess(['123'])
try {
const result = await handle.wait()
console.log('Success:', result)
} catch (error) {
// Error from the process
console.error('Process failed:', error.message)
}
// Check state directly
const state = await runtime.storage.load(handle.id)
if (state?.status === 'failed') {
console.log('Error:', state.error)
}Scoped Executor
Use withExecutor to scope an executor to a specific code block (useful for tests or multi-tenant scenarios):
import { withExecutor, createInMemoryRuntime } from '@justscale/process'
// Create isolated runtime for this scope
const runtime = createInMemoryRuntime({ container })
await withExecutor(runtime.executor, async () => {
// All process operations in here use this executor
const handle = await MyProcess(['id'])
await service.emitSignal('id', payload)
const result = await handle.wait()
})
// Outside the scope, the previous executor is restoredStorage Interface
Implement ProcessStorage to use custom backends:
import type { ProcessStorage, ProcessState, ProcessStatus } from '@justscale/process'
class RedisProcessStorage implements ProcessStorage {
constructor(private redis: RedisClient) {}
async save(state: ProcessState): Promise<void> {
await this.redis.set(`process:${state.instanceId}`, JSON.stringify(state))
}
async load(instanceId: string): Promise<ProcessState | null> {
const data = await this.redis.get(`process:${instanceId}`)
return data ? JSON.parse(data) : null
}
async delete(instanceId: string): Promise<void> {
await this.redis.del(`process:${instanceId}`)
}
async complete(instanceId: string, result: unknown): Promise<void> {
const state = await this.load(instanceId)
if (state) {
state.status = 'completed'
state.result = result
await this.save(state)
}
}
// ... other methods
}