Runtime & Testing

Setting up the process executor and testing durable processes

Durable processes need a runtime to execute. The runtime consists of three pluggable components: storage (persists state), signal bus (routes signals), and timer scheduler (manages durable timers).

In-Memory Runtime

For development and testing, use the in-memory runtime. State is lost on restart, but it's fast and requires no external dependencies:

setup.tsTypeScript
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/process'
import { container } from './container'

// Create runtime with all components
const runtime = createInMemoryRuntime({ container })

// Set as global executor (required for signal emission)
setProcessExecutor(runtime.executor)

// Now processes can be started and signals emitted
import { OrderFulfillment } from './processes'

const handle = await OrderFulfillment(['order-123'])
console.log(handle.status) // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'

Production Runtime

For production, plug in persistent backends. JustScale provides PostgreSQL adapters:

production-setup.tsTypeScript
import { createProcessExecutor, setProcessExecutor } from '@justscale/process'
import { PgProcessStorage, PgSignalBus } from '@justscale/postgres'
import { ScheduledTaskTimerScheduler } from '@justscale/process'
import { container } from './container'

// PostgreSQL-backed runtime
const executor = createProcessExecutor({
  container,
  storage: new PgProcessStorage(pgClient),
  signalBus: new PgSignalBus(pubsub),
  timerScheduler: new ScheduledTaskTimerScheduler(scheduledTaskRepo),
})

setProcessExecutor(executor)

// Start processing timers
executor.startTimers()

Process Handle

Starting a process returns a handle with status info and a promise for the result:

handle-api.tsTypeScript
import { OrderFulfillment } from './processes'

// Start a process
const handle = await OrderFulfillment(['order-123'])

// Handle properties
handle.id      // 'order/order-123/fulfillment' (instance ID)
handle.status  // 'pending' | 'running' | 'suspended' | 'completed' | 'failed'

// Wait for completion (blocks until process finishes)
const result = await handle.wait()
// result is typed based on your handler's return type

// Check status without blocking
if (handle.status === 'suspended') {
  console.log('Waiting for signal...')
}

// Idempotent starts - same params returns existing handle
const handle2 = await OrderFulfillment(['order-123'])
console.log(handle.id === handle2.id) // true

Process Lifecycle

Processes move through these states:

  • pending - Created but not yet executing
  • running - Currently executing blocks
  • suspended - Waiting for signal or timer
  • completed - Finished successfully with result
  • failed - Threw an error, captured in state
lifecycle.tsTypeScript
// pending → running (start executing)
const handle = await MyProcess(['id'])  // status: 'pending' → 'running'

// running → suspended (hit signal/delay)
await signal(service.someEvent)  // status: 'suspended'

// suspended → running (signal emitted)
await service.someEvent('id', payload)  // status: 'running'

// running → completed (handler returns)
return { success: true }  // status: 'completed'

// running → failed (handler throws)
throw new Error('Oops')  // status: 'failed'

Duration Helpers

Use duration helpers with delay() for readable timeouts:

durations.tsTypeScript
import { delay, seconds, minutes, hours, days } from '@justscale/process'

// All durations are converted to milliseconds internally
await delay(seconds(30))   // 30 seconds
await delay(minutes(5))    // 5 minutes
await delay(hours(24))     // 24 hours
await delay(days(7))       // 7 days

// In a race
const r = race()
switch (true) {
  case signal(r, orders.shipped):
    return { shipped: true }
  case delay(r, days(30)):
    return { expired: true, reason: '30-day shipping window exceeded' }
}

Testing Processes

The in-memory runtime provides direct access to components for testing:

process.test.tsTypeScript
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { createInMemoryRuntime, setProcessExecutor } from '@justscale/process'
import { TestContainer } from '@justscale/testing'
import { OrderFulfillment } from './processes'
import { OrderService, ShippingService } from './services'

describe('OrderFulfillment', () => {
  let runtime: ReturnType<typeof createInMemoryRuntime>
  let container: TestContainer

  beforeEach(() => {
    container = new TestContainer()
    container.register(OrderService, mockOrderService)
    container.register(ShippingService, mockShippingService)

    runtime = createInMemoryRuntime({ container })
    setProcessExecutor(runtime.executor)
  })

  afterEach(() => {
    runtime.stop()
    runtime.clear()
  })

  it('completes when all signals received', async () => {
    // Start process
    const handle = await OrderFulfillment(['order-123'])
    assert.strictEqual(handle.status, 'suspended')

    // Emit payment signal
    await runtime.executor.emit('orders.paymentReceived',
      { orderId: 'order-123' },
      { txId: 'tx-456' }
    )

    // Check suspended again (waiting for shipped)
    const state = await runtime.storage.load(handle.id)
    assert.strictEqual(state?.status, 'suspended')

    // Emit shipped signal
    await runtime.executor.emit('orders.shipped',
      { orderId: 'order-123' },
      { trackingNumber: 'TRACK-789' }
    )

    // Wait for completion
    const result = await handle.wait()
    assert.deepStrictEqual(result, {
      status: 'completed',
      orderId: 'order-123',
    })
  })
})

Testing Timeouts

The in-memory timer scheduler supports time manipulation for testing:

timeout.test.tsTypeScript
it('expires after timeout', async () => {
  const handle = await PaymentProcess(['pay-123'])
  assert.strictEqual(handle.status, 'suspended')

  // Advance time past the timeout
  const futureDate = new Date(Date.now() + 25 * 60 * 60 * 1000) // +25 hours
  runtime.timerScheduler.advanceTo(futureDate)

  // Process should complete with timeout result
  const result = await handle.wait()
  assert.strictEqual(result.status, 'timeout')
})

Error Handling

When a block throws, the process is marked as failed and the error is captured:

error-handling.tsTypeScript
// Process that might fail
export const RiskyProcess = createProcess({
  path: '/risky/:id',
  inject: { service: RiskyService },

  async handler({ service }, [id]) {
    // If this throws, process goes to 'failed' status
    const result = await service.doRiskyThing(id)
    return { success: true, result }
  },
})

// In your code
const handle = await RiskyProcess(['123'])

try {
  const result = await handle.wait()
  console.log('Success:', result)
} catch (error) {
  // Error from the process
  console.error('Process failed:', error.message)
}

// Check state directly
const state = await runtime.storage.load(handle.id)
if (state?.status === 'failed') {
  console.log('Error:', state.error)
}

Scoped Executor

Use withExecutor to scope an executor to a specific code block (useful for tests or multi-tenant scenarios):

scoped.tsTypeScript
import { withExecutor, createInMemoryRuntime } from '@justscale/process'

// Create isolated runtime for this scope
const runtime = createInMemoryRuntime({ container })

await withExecutor(runtime.executor, async () => {
  // All process operations in here use this executor
  const handle = await MyProcess(['id'])
  await service.emitSignal('id', payload)
  const result = await handle.wait()
})

// Outside the scope, the previous executor is restored

Storage Interface

Implement ProcessStorage to use custom backends:

custom-storage.tsTypeScript
import type { ProcessStorage, ProcessState, ProcessStatus } from '@justscale/process'

class RedisProcessStorage implements ProcessStorage {
  constructor(private redis: RedisClient) {}

  async save(state: ProcessState): Promise<void> {
    await this.redis.set(`process:${state.instanceId}`, JSON.stringify(state))
  }

  async load(instanceId: string): Promise<ProcessState | null> {
    const data = await this.redis.get(`process:${instanceId}`)
    return data ? JSON.parse(data) : null
  }

  async delete(instanceId: string): Promise<void> {
    await this.redis.del(`process:${instanceId}`)
  }

  async complete(instanceId: string, result: unknown): Promise<void> {
    const state = await this.load(instanceId)
    if (state) {
      state.status = 'completed'
      state.result = result
      await this.save(state)
    }
  }

  // ... other methods
}