Runtime & Testing
Setting up the process executor and testing durable processes
Durable processes need a runtime to execute. The runtime consists of three pluggable components: storage (persists state), signal bus (routes signals), and timer scheduler (manages durable timers).
In-Memory Runtime
For development and testing, use the in-memory runtime. State is lost on restart, but it's fast and requires no external dependencies:
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/core/process'
import { container } from './container'
// Create runtime with all components
const runtime = createInMemoryRuntime({ container })
// Set as global executor (required for signal emission)
setProcessExecutor(runtime.executor)
// Now processes can be started and signals emitted
import { OrderFulfillment } from './processes'
const handle = await OrderFulfillment(['order-123'])
console.log(handle.status) // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'Production Runtime
For production, plug in persistent backends. JustScale provides PostgreSQL adapters:
import { defineApp } from '@justscale/core'
import JustScale from '@justscale/core'
import {
PostgresFeature,
PostgresChannelFeature,
PostgresLockFeature,
PostgresProcessFeature,
PostgresMigrationFeature,
} from '@justscale/postgres'
import type { AppEnv } from './env-contract.js'
// defineApp handles env loading, build/compile, and CLI-vs-serve dispatch.
// PostgresProcessFeature provides the process executor, signal bus, and
// scheduled-task timers; it requires client + channel backend + lock
// provider, which the three features above register.
export default defineApp(import.meta, (env: AppEnv) =>
JustScale()
.add(env)
.add(PostgresFeature)
.add(PostgresChannelFeature)
.add(PostgresLockFeature)
.add(PostgresProcessFeature)
.add(PostgresMigrationFeature),
)Process Handle
Starting a process returns a handle with status info and a promise for the result:
import { OrderFulfillment } from './processes'
// Start a process
const handle = await OrderFulfillment(['order-123'])
// Handle properties
handle.id // 'order/order-123/fulfillment' (instance ID)
handle.status // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'
// Wait for completion (blocks until process finishes)
const result = await handle.wait()
// result is typed based on your handler's return type
// Check status without blocking
if (handle.status === 'suspended') {
console.log('Waiting for signal...')
}
// Idempotent starts - same params returns existing handle
const handle2 = await OrderFulfillment(['order-123'])
console.log(handle.id === handle2.id) // trueProcess Lifecycle
Processes move through these states:
- pending - Created but not yet executing
- running - Currently executing blocks
- suspended - Waiting for signal or timer
- completed - Finished successfully with result
- failed - Threw an unrecoverable error, captured in state
// pending → running (start executing)
const handle = await MyProcess(['id']) // status: 'pending' → 'running'
// running → suspended (hit signal/delay)
await signal(service.someEvent) // status: 'suspended'
// suspended → running (signal emitted)
await service.someEvent('id', payload) // status: 'running'
// running → completed (handler returns)
return { success: true } // status: 'completed'
// running → failed (handler throws)
throw new Error('Oops') // status: 'failed'Recoverable Errors (DoubleLockError)
Not every handler error terminates the process. DoubleLockError — thrown when a handler tries to acquire a lock it already holds in the same async context — is treated as recoverable: the process stays suspended at its prior step with a lastError marker, so a future execution (next signal firing, or a post-deploy restart running fixed code) can retry from the same point.
The rationale: durable processes outlive deploys. Terminating a multi-week process because of a bug that lands in the next PR would destroy state. Since using lock = await repo.lock(...) runs before mutations by design, a DoubleLockError throw never leaves the process in a half-mutated state — pausing is safe.
// Bug: handler locks the same entity twice in one pass
async handler({ orders }, { order }) {
using a = await orders.lock(order)
using b = await orders.lock(order) // → DoubleLockError
// mutations below never run
}
// Result: process state
{
status: 'suspended', // NOT 'failed' — recoverable
error: undefined, // terminal-error slot untouched
lastError: 'Cannot acquire lock "lock:Order:abc" — ... ' +
'(process orders/... step 0)',
lastErrorAt: Date
}
// Ship a fix, redeploy. On next signal firing OR restart,
// executor re-runs handler with new code. If it now succeeds,
// lastError / lastErrorAt are cleared and the process advances.The error is still loud: the executor logs it at ERROR level with [ProcessExecutor] prefix and the full process/step context, so monitoring picks it up. Only the persistence behavior differs from a terminal failure.
Introspecting lastError
// Query suspended processes with a recoverable-error marker
for await (const state of executor.queryByStatus('suspended')) {
if (state.lastError) {
console.warn(
`Stuck process ${state.instanceId}: ${state.lastError} ` +
`(since ${state.lastErrorAt?.toISOString()})`
)
}
}Only DoubleLockError gets this treatment. Any other thrown error still transitions the process to failed with the terminal error field set.
Duration Helpers
delay exposes unit-named functions rather than a separate seconds()/minutes() helper. In a race, the first argument is the race() handle; the second is the numeric duration.
import { delay, race, signal } from '@justscale/core/process'
// Standalone (outside a race) — await the suspension directly
await delay.seconds(30)
await delay.minutes(5)
await delay.hours(24)
await delay.days(7)
// In a race — first arg is the race handle
const r = race()
switch (true) {
case signal(r, orders.shipped):
return { shipped: true }
case delay.days(r, 30):
return { expired: true, reason: '30-day shipping window exceeded' }
}Testing Processes
The in-memory runtime provides direct access to components for testing:
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/core/process'
import { TestContainer } from '@justscale/testing'
import { OrderFulfillment } from './processes'
import { OrderService, ShippingService } from './services'
describe('OrderFulfillment', () => {
let runtime: ReturnType<typeof createInMemoryRuntime>
let container: TestContainer
beforeEach(() => {
container = new TestContainer()
container.register(OrderService, mockOrderService)
container.register(ShippingService, mockShippingService)
runtime = createInMemoryRuntime({ container })
setProcessExecutor(runtime.executor)
})
afterEach(() => {
runtime.stop()
runtime.clear()
})
it('completes when all signals received', async () => {
// Start process
const handle = await OrderFulfillment(['order-123'])
assert.strictEqual(handle.status, 'suspended')
// Emit payment signal
await runtime.executor.emit('orders.paymentReceived',
{ orderId: 'order-123' },
{ txId: 'tx-456' }
)
// Check suspended again (waiting for shipped)
const state = await runtime.storage.load(handle.id)
assert.strictEqual(state?.status, 'suspended')
// Emit shipped signal
await runtime.executor.emit('orders.shipped',
{ orderId: 'order-123' },
{ trackingNumber: 'TRACK-789' }
)
// Wait for completion
const result = await handle.wait()
assert.deepStrictEqual(result, {
status: 'completed',
orderId: 'order-123',
})
})
})Testing Timeouts
The in-memory timer scheduler supports time manipulation for testing:
it('expires after timeout', async () => {
const handle = await PaymentProcess(['pay-123'])
assert.strictEqual(handle.status, 'suspended')
// Advance time past the timeout
const futureDate = new Date(Date.now() + 25 * 60 * 60 * 1000) // +25 hours
runtime.timerScheduler.advanceTo(futureDate)
// Process should complete with timeout result
const result = await handle.wait()
assert.strictEqual(result.status, 'timeout')
})Error Handling
When a block throws, the process is marked as failed and the error is captured:
// Process that might fail
export const RiskyProcess = createProcess({
path: '/risky/:id',
inject: { service: RiskyService },
async handler({ service }, [id]) {
// If this throws, process goes to 'failed' status
const result = await service.doRiskyThing(id)
return { success: true, result }
},
})
// In your code
const handle = await RiskyProcess(['123'])
try {
const result = await handle.wait()
console.log('Success:', result)
} catch (error) {
// Error from the process
console.error('Process failed:', error.message)
}
// Check state directly
const state = await runtime.storage.load(handle.id)
if (state?.status === 'failed') {
console.log('Error:', state.error)
}Scoped Executor
Use withExecutor to scope an executor to a specific code block (useful for tests or multi-tenant scenarios):
import { withExecutor, createInMemoryRuntime } from '@justscale/core/process'
// Create isolated runtime for this scope
const runtime = createInMemoryRuntime({ container })
await withExecutor(runtime.executor, async () => {
// All process operations in here use this executor
const handle = await MyProcess(['id'])
await service.emitSignal('id', payload)
const result = await handle.wait()
})
// Outside the scope, the previous executor is restoredStorage Interface
Implement ProcessStorage to use custom backends:
import type { ProcessStorage, ProcessState, ProcessStatus } from '@justscale/core/process'
class RedisProcessStorage implements ProcessStorage {
constructor(private redis: RedisClient) {}
async save(state: ProcessState): Promise<void> {
await this.redis.set(`process:${state.instanceId}`, JSON.stringify(state))
}
async load(instanceId: string): Promise<ProcessState | null> {
const data = await this.redis.get(`process:${instanceId}`)
return data ? JSON.parse(data) : null
}
async delete(instanceId: string): Promise<void> {
await this.redis.del(`process:${instanceId}`)
}
async complete(instanceId: string, result: unknown): Promise<void> {
const state = await this.load(instanceId)
if (state) {
state.status = 'completed'
state.result = result
await this.save(state)
}
}
// ... other methods
}