Contextual Controllers

Build RPC-style WebSocket APIs with procedures

Contextual controllers let you define WebSocket APIs as a collection of procedures, similar to tRPC or JSON-RPC. Instead of handling raw messages, you define typed procedures that are invoked by command name.

Why Contextual Controllers?

Traditional WebSocket handlers process messages in a single loop:

traditional.tsTypeScript
// Traditional approach - one big switch
Ws('/').handle(async ({ messages, send }) => {
  for await (const msg of messages) {
    switch (msg.type) {
      case 'join':
        // join logic...
        break;
      case 'leave':
        // leave logic...
        break;
      case 'message':
        // message logic...
        break;
      // Gets unwieldy with many message types
    }
  }
});

Contextual controllers split this into focused procedures:

contextual.tsTypeScript
// Contextual approach - separate procedures
const ChatProcedures = createController
  .withContext<ChatSession>()
  .create({
    routes: (_, { Procedure }) => ({
      join: Procedure('room/:roomId/join')
        .handle(({ session, params }) => {
          // Just join logic
        }),

      leave: Procedure('room/:roomId/leave')
        .handle(({ session, params }) => {
          // Just leave logic
        }),

      message: Procedure('room/:roomId/message')
        .body(z.object({ content: z.string() }))
        .handle(({ session, params, body }) => {
          // Just message logic
        }),
    }),
  });

Creating a Contextual Controller

Use createController.withContext<T>() to create a controller with session context. The context type represents the connection state:

Files
chat-session.tsTypeScript
import type { RawMessageSource } from '@justscale/core';
import type { RoomSubscription } from './chat-service';

// Session context - bound when WebSocket connects
export interface ChatSession {
  // User identity
  username: string;

  // Raw WebSocket adapter for session.run()
  ws: RawMessageSource;

  // Send typed message to this client
  send: (msg: ServerMessage) => void;

  // Rooms the user has joined
  rooms: Map<string, {
    subscription: RoomSubscription;
  }>;
}

Using Procedures in WebSocket

Create a session and run it in your WebSocket handler:

chat-controller.tsTypeScript
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
import { ChatProcedures, type ChatSession } from './chat-procedures';

// Command message format
const CommandSchema = z.object({
  command: z.string(),        // e.g., "room/general/join"
  payload: z.record(z.unknown()).optional(),
});

export const ChatController = createController('/chat', {
  inject: { procedures: ChatProcedures },

  routes: (services) => ({
    ws: Ws('/')
      .message(CommandSchema)
      .handle(async ({ messages, send, query }) => {
        const username = query.username || 'Anonymous';

        // Build session context
        const session: ChatSession = {
          username,
          ws: {
            rawMessages: async function* () {
              for await (const msg of messages) {
                yield JSON.stringify(msg);
              }
            },
            send: (data: string) => send(JSON.parse(data)),
          },
          send: (msg) => send(msg),
          rooms: new Map(),
        };

        // Create procedural session
        using procedureSession = services.procedures.createSession(session);

        // Clean up subscriptions on disconnect
        procedureSession.onDispose(() => {
          for (const [, room] of session.rooms) {
            room.subscription.unsubscribe();
          }
        });

        // Run the session - routes commands to procedures
        await procedureSession.run();
      }),
  }),
});
ℹ️

Info

The session.run() method reads commands from the WebSocket, matches them to procedures, and invokes handlers. Generator responses are streamed back to the client.

Procedure Patterns

Path Parameters

Procedures support path parameters just like HTTP routes:

path-params.tsTypeScript
// Client sends: { command: "room/general/join" }
join: Procedure('room/:roomId/join')
  .handle(({ params }) => {
    // params.roomId === 'general'
  }),

// Client sends: { command: "user/123/follow" }
follow: Procedure('user/:userId/follow')
  .handle(({ params }) => {
    // params.userId === '123'
  }),

Request Body

Use .body() to validate the command payload:

body-validation.tsTypeScript
// Client sends: { command: "room/general/message", payload: { content: "Hi!" } }
message: Procedure('room/:roomId/message')
  .body(z.object({
    content: z.string().min(1).max(1000),
  }))
  .handle(({ params, body }) => {
    // body.content is validated and typed
    console.log(`Message in ${params.roomId}: ${body.content}`);
  }),

Middleware

Apply middleware to individual procedures:

middleware.tsTypeScript
// Middleware: require room membership
function requireMembership({ session, params }: {
  session: ChatSession;
  params: { roomId: string };
}) {
  if (!session.rooms.has(params.roomId)) {
    throw new Error(`Not a member of ${params.roomId}`);
  }
  return { room: session.rooms.get(params.roomId)! };
}

// Apply to procedure
message: Procedure('room/:roomId/message')
  .use(requireMembership)  // Adds 'room' to context
  .body(z.object({ content: z.string() }))
  .handle(({ room, body }) => {
    // Can access room from middleware
  }),

Streaming Responses

Use async generators to stream multiple responses:

streaming.tsTypeScript
// Generator procedure - streams responses
join: Procedure('room/:roomId/join')
  .handle(async function*({ session, params }) {
    const subscription = services.chat.subscribe(params.roomId);

    // Send immediate response
    yield { type: 'joined', roomId: params.roomId };

    // Stream messages as they arrive
    for await (const msg of subscription) {
      yield msg;  // Each yield sends to client
    }

    // When subscription ends, generator completes
  }),

// Non-generator - single response
leave: Procedure('room/:roomId/leave')
  .handle(({ params }) => {
    // Single return value sent to client
    return { left: params.roomId };
  }),

Session Lifecycle

The session provides lifecycle hooks for cleanup:

lifecycle.tsTypeScript
// Create session with using for auto-disposal
using session = services.procedures.createSession(context);

// Register cleanup handler
session.onDispose(() => {
  // Called when:
  // - Client disconnects
  // - Handler returns/throws
  // - using block exits

  // Clean up subscriptions
  for (const [, room] of context.rooms) {
    room.subscription.unsubscribe();
  }

  // Broadcast leave to all rooms
  for (const [roomId] of context.rooms) {
    services.chat.broadcast(roomId, {
      type: 'leave',
      username: context.username,
      timestamp: Date.now(),
    });
  }
});

// Run the session
await session.run();

Error Handling

Errors in procedures are caught and sent to the client:

errors.tsTypeScript
message: Procedure('room/:roomId/message')
  .handle(({ params, body }) => {
    // Throwing sends error to client
    if (!body.content.trim()) {
      throw new Error('Message cannot be empty');
    }

    // Validation errors from .body() are also sent
  }),

// Client receives:
// { error: 'Message cannot be empty', command: 'room/general/message' }

Client Protocol

The client sends commands as JSON objects:

client-example.tsTypeScript
const ws = new WebSocket('ws://localhost:3000/chat?username=alice');

// Join a room
ws.send(JSON.stringify({
  command: 'room/general/join',
}));

// Send a message
ws.send(JSON.stringify({
  command: 'room/general/message',
  payload: { content: 'Hello everyone!' },
}));

// Leave the room
ws.send(JSON.stringify({
  command: 'room/general/leave',
}));

// Handle responses
ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  console.log('Received:', msg);
};

Complete Example

A full chat application with contextual controllers:

chat-procedures.tsTypeScript
import { createController } from '@justscale/core';
import { z } from 'zod';
import { ChatService } from './chat-service';

interface ChatSession {
  username: string;
  send: (msg: unknown) => void;
  rooms: Map<string, { subscription: any }>;
}

export const ChatProcedures = createController
  .withContext<ChatSession>()
  .create({
    inject: { chat: ChatService },
    routes: (services, { Procedure }) => ({
      join: Procedure('room/:roomId/join')
        .handle(async function*({ session, params }) {
          const { roomId } = params;
          if (session.rooms.has(roomId)) return { error: 'already_joined' };

          const sub = services.chat.subscribe(roomId);
          session.rooms.set(roomId, { subscription: sub });

          services.chat.broadcast(roomId, {
            type: 'join', username: session.username, timestamp: Date.now(),
          });

          yield { type: 'users', users: services.chat.getMembers(roomId) };
          for await (const msg of sub) yield msg;
        }),

      leave: Procedure('room/:roomId/leave')
        .handle(({ session, params }) => {
          const room = session.rooms.get(params.roomId);
          if (!room) return { error: 'not_member' };

          room.subscription.unsubscribe();
          session.rooms.delete(params.roomId);
          services.chat.broadcast(params.roomId, {
            type: 'leave', username: session.username, timestamp: Date.now(),
          });
          return { left: params.roomId };
        }),

      message: Procedure('room/:roomId/message')
        .body(z.object({ content: z.string() }))
        .handle(({ session, params, body }) => {
          if (!session.rooms.has(params.roomId)) {
            throw new Error('Not a member');
          }
          services.chat.broadcast(params.roomId, {
            type: 'message',
            username: session.username,
            content: body.content,
            timestamp: Date.now(),
          });
        }),
    }),
  });