Skip to content

Runtime & Testing

Setting up the process executor and testing durable processes

Durable processes need a runtime to execute. The runtime consists of three pluggable components: storage (persists state), signal bus (routes signals), and timer scheduler (manages durable timers).

In-Memory Runtime

For development and testing, use the in-memory runtime. State is lost on restart, but it's fast and requires no external dependencies:

setup.tsTypeScript
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/core/process'
import { container } from './container'

// Create runtime with all components
const runtime = createInMemoryRuntime({ container })

// Set as global executor (required for signal emission)
setProcessExecutor(runtime.executor)

// Now processes can be started and signals emitted
import { OrderFulfillment } from './processes'

const handle = await OrderFulfillment(['order-123'])
console.log(handle.status) // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'

Production Runtime

For production, plug in persistent backends. JustScale provides PostgreSQL adapters:

app.tsTypeScript
import { defineApp } from '@justscale/core'
import JustScale from '@justscale/core'
import {
  PostgresFeature,
  PostgresChannelFeature,
  PostgresLockFeature,
  PostgresProcessFeature,
  PostgresMigrationFeature,
} from '@justscale/postgres'
import type { AppEnv } from './env-contract.js'

// defineApp handles env loading, build/compile, and CLI-vs-serve dispatch.
// PostgresProcessFeature provides the process executor, signal bus, and
// scheduled-task timers; it requires client + channel backend + lock
// provider, which the three features above register.
export default defineApp(import.meta, (env: AppEnv) =>
  JustScale()
    .add(env)
    .add(PostgresFeature)
    .add(PostgresChannelFeature)
    .add(PostgresLockFeature)
    .add(PostgresProcessFeature)
    .add(PostgresMigrationFeature),
)

Process Handle

Starting a process returns a handle with status info and a promise for the result:

handle-api.tsTypeScript
import { OrderFulfillment } from './processes'

// Start a process
const handle = await OrderFulfillment(['order-123'])

// Handle properties
handle.id      // 'order/order-123/fulfillment' (instance ID)
handle.status  // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'

// Wait for completion (blocks until process finishes)
const result = await handle.wait()
// result is typed based on your handler's return type

// Check status without blocking
if (handle.status === 'suspended') {
  console.log('Waiting for signal...')
}

// Idempotent starts - same params returns existing handle
const handle2 = await OrderFulfillment(['order-123'])
console.log(handle.id === handle2.id) // true

Process Lifecycle

Processes move through these states:

  • pending - Created but not yet executing
  • running - Currently executing blocks
  • suspended - Waiting for signal or timer
  • completed - Finished successfully with result
  • failed - Threw an unrecoverable error, captured in state
lifecycle.tsTypeScript
// pending → running (start executing)
const handle = await MyProcess(['id'])  // status: 'pending' → 'running'

// running → suspended (hit signal/delay)
await signal(service.someEvent)  // status: 'suspended'

// suspended → running (signal emitted)
await service.someEvent('id', payload)  // status: 'running'

// running → completed (handler returns)
return { success: true }  // status: 'completed'

// running → failed (handler throws)
throw new Error('Oops')  // status: 'failed'

Recoverable Errors (DoubleLockError)

Not every handler error terminates the process. DoubleLockError — thrown when a handler tries to acquire a lock it already holds in the same async context — is treated as recoverable: the process stays suspended at its prior step with a lastError marker, so a future execution (next signal firing, or a post-deploy restart running fixed code) can retry from the same point.

The rationale: durable processes outlive deploys. Terminating a multi-week process because of a bug that lands in the next PR would destroy state. Since using lock = await repo.lock(...) runs before mutations by design, a DoubleLockError throw never leaves the process in a half-mutated state — pausing is safe.

recoverable-error.tsTypeScript
// Bug: handler locks the same entity twice in one pass
async handler({ orders }, { order }) {
  using a = await orders.lock(order)
  using b = await orders.lock(order)  // → DoubleLockError
  // mutations below never run
}

// Result: process state
{
  status: 'suspended',          // NOT 'failed' — recoverable
  error: undefined,             // terminal-error slot untouched
  lastError: 'Cannot acquire lock "lock:Order:abc" — ... ' +
             '(process orders/... step 0)',
  lastErrorAt: Date
}

// Ship a fix, redeploy. On next signal firing OR restart,
// executor re-runs handler with new code. If it now succeeds,
// lastError / lastErrorAt are cleared and the process advances.

The error is still loud: the executor logs it at ERROR level with [ProcessExecutor] prefix and the full process/step context, so monitoring picks it up. Only the persistence behavior differs from a terminal failure.

Introspecting lastError

introspect.tsTypeScript
// Query suspended processes with a recoverable-error marker
for await (const state of executor.queryByStatus('suspended')) {
  if (state.lastError) {
    console.warn(
      `Stuck process ${state.instanceId}: ${state.lastError} ` +
      `(since ${state.lastErrorAt?.toISOString()})`
    )
  }
}

Only DoubleLockError gets this treatment. Any other thrown error still transitions the process to failed with the terminal error field set.

Duration Helpers

delay exposes unit-named functions rather than a separate seconds()/minutes() helper. In a race, the first argument is the race() handle; the second is the numeric duration.

durations.tsTypeScript
import { delay, race, signal } from '@justscale/core/process'

// Standalone (outside a race) — await the suspension directly
await delay.seconds(30)
await delay.minutes(5)
await delay.hours(24)
await delay.days(7)

// In a race — first arg is the race handle
const r = race()
switch (true) {
  case signal(r, orders.shipped):
    return { shipped: true }
  case delay.days(r, 30):
    return { expired: true, reason: '30-day shipping window exceeded' }
}

Testing Processes

The in-memory runtime provides direct access to components for testing:

process.test.tsTypeScript
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/core/process'
import { TestContainer } from '@justscale/testing'
import { OrderFulfillment } from './processes'
import { OrderService, ShippingService } from './services'

describe('OrderFulfillment', () => {
  let runtime: ReturnType<typeof createInMemoryRuntime>
  let container: TestContainer

  beforeEach(() => {
    container = new TestContainer()
    container.register(OrderService, mockOrderService)
    container.register(ShippingService, mockShippingService)

    runtime = createInMemoryRuntime({ container })
    setProcessExecutor(runtime.executor)
  })

  afterEach(() => {
    runtime.stop()
    runtime.clear()
  })

  it('completes when all signals received', async () => {
    // Start process
    const handle = await OrderFulfillment(['order-123'])
    assert.strictEqual(handle.status, 'suspended')

    // Emit payment signal
    await runtime.executor.emit('orders.paymentReceived',
      { orderId: 'order-123' },
      { txId: 'tx-456' }
    )

    // Check suspended again (waiting for shipped)
    const state = await runtime.storage.load(handle.id)
    assert.strictEqual(state?.status, 'suspended')

    // Emit shipped signal
    await runtime.executor.emit('orders.shipped',
      { orderId: 'order-123' },
      { trackingNumber: 'TRACK-789' }
    )

    // Wait for completion
    const result = await handle.wait()
    assert.deepStrictEqual(result, {
      status: 'completed',
      orderId: 'order-123',
    })
  })
})

Testing Timeouts

The in-memory timer scheduler supports time manipulation for testing:

timeout.test.tsTypeScript
it('expires after timeout', async () => {
  const handle = await PaymentProcess(['pay-123'])
  assert.strictEqual(handle.status, 'suspended')

  // Advance time past the timeout
  const futureDate = new Date(Date.now() + 25 * 60 * 60 * 1000) // +25 hours
  runtime.timerScheduler.advanceTo(futureDate)

  // Process should complete with timeout result
  const result = await handle.wait()
  assert.strictEqual(result.status, 'timeout')
})

Error Handling

When a block throws, the process is marked as failed and the error is captured:

error-handling.tsTypeScript
// Process that might fail
export const RiskyProcess = createProcess({
  path: '/risky/:id',
  inject: { service: RiskyService },

  async handler({ service }, [id]) {
    // If this throws, process goes to 'failed' status
    const result = await service.doRiskyThing(id)
    return { success: true, result }
  },
})

// In your code
const handle = await RiskyProcess(['123'])

try {
  const result = await handle.wait()
  console.log('Success:', result)
} catch (error) {
  // Error from the process
  console.error('Process failed:', error.message)
}

// Check state directly
const state = await runtime.storage.load(handle.id)
if (state?.status === 'failed') {
  console.log('Error:', state.error)
}

Scoped Executor

Use withExecutor to scope an executor to a specific code block (useful for tests or multi-tenant scenarios):

scoped.tsTypeScript
import { withExecutor, createInMemoryRuntime } from '@justscale/core/process'

// Create isolated runtime for this scope
const runtime = createInMemoryRuntime({ container })

await withExecutor(runtime.executor, async () => {
  // All process operations in here use this executor
  const handle = await MyProcess(['id'])
  await service.emitSignal('id', payload)
  const result = await handle.wait()
})

// Outside the scope, the previous executor is restored

Storage Interface

Implement ProcessStorage to use custom backends:

custom-storage.tsTypeScript
import type { ProcessStorage, ProcessState, ProcessStatus } from '@justscale/core/process'

class RedisProcessStorage implements ProcessStorage {
  constructor(private redis: RedisClient) {}

  async save(state: ProcessState): Promise<void> {
    await this.redis.set(`process:${state.instanceId}`, JSON.stringify(state))
  }

  async load(instanceId: string): Promise<ProcessState | null> {
    const data = await this.redis.get(`process:${instanceId}`)
    return data ? JSON.parse(data) : null
  }

  async delete(instanceId: string): Promise<void> {
    await this.redis.del(`process:${instanceId}`)
  }

  async complete(instanceId: string, result: unknown): Promise<void> {
    const state = await this.load(instanceId)
    if (state) {
      state.status = 'completed'
      state.result = result
      await this.save(state)
    }
  }

  // ... other methods
}