Contextual Controllers
Build RPC-style WebSocket APIs with procedures
Contextual controllers let you define WebSocket APIs as a collection of procedures, similar to tRPC or JSON-RPC. Instead of handling raw messages, you define typed procedures that are invoked by command name.
Why Contextual Controllers?
Traditional WebSocket handlers process messages in a single loop:
// Traditional approach - one big switch
Ws('/').handle(async ({ messages, send }) => {
for await (const msg of messages) {
switch (msg.type) {
case 'join':
// join logic...
break;
case 'leave':
// leave logic...
break;
case 'message':
// message logic...
break;
// Gets unwieldy with many message types
}
}
});Contextual controllers split this into focused procedures:
// Contextual approach - separate procedures
const ChatProcedures = createController
.withContext<ChatSession>()
.create({
routes: (_, { Procedure }) => ({
join: Procedure('room/:roomId/join')
.handle(({ session, params }) => {
// Just join logic
}),
leave: Procedure('room/:roomId/leave')
.handle(({ session, params }) => {
// Just leave logic
}),
message: Procedure('room/:roomId/message')
.body(z.object({ content: z.string() }))
.handle(({ session, params, body }) => {
// Just message logic
}),
}),
});Creating a Contextual Controller
Use createController.withContext<T>() to create a controller with session context. The context type represents the connection state:
import type { RawMessageSource } from '@justscale/core';
import type { RoomSubscription } from './chat-service';
// Session context - bound when WebSocket connects
export interface ChatSession {
// User identity
username: string;
// Raw WebSocket adapter for session.run()
ws: RawMessageSource;
// Send typed message to this client
send: (msg: ServerMessage) => void;
// Rooms the user has joined
rooms: Map<string, {
subscription: RoomSubscription;
}>;
}Using Procedures in WebSocket
Create a session and run it in your WebSocket handler:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
import { ChatProcedures, type ChatSession } from './chat-procedures';
// Command message format
const CommandSchema = z.object({
command: z.string(), // e.g., "room/general/join"
payload: z.record(z.unknown()).optional(),
});
export const ChatController = createController('/chat', {
inject: { procedures: ChatProcedures },
routes: (services) => ({
ws: Ws('/')
.message(CommandSchema)
.handle(async ({ messages, send, query }) => {
const username = query.username || 'Anonymous';
// Build session context
const session: ChatSession = {
username,
ws: {
rawMessages: async function* () {
for await (const msg of messages) {
yield JSON.stringify(msg);
}
},
send: (data: string) => send(JSON.parse(data)),
},
send: (msg) => send(msg),
rooms: new Map(),
};
// Create procedural session
using procedureSession = services.procedures.createSession(session);
// Clean up subscriptions on disconnect
procedureSession.onDispose(() => {
for (const [, room] of session.rooms) {
room.subscription.unsubscribe();
}
});
// Run the session - routes commands to procedures
await procedureSession.run();
}),
}),
});Info
session.run() method reads commands from the WebSocket, matches them to procedures, and invokes handlers. Generator responses are streamed back to the client.Procedure Patterns
Path Parameters
Procedures support path parameters just like HTTP routes:
// Client sends: { command: "room/general/join" }
join: Procedure('room/:roomId/join')
.handle(({ params }) => {
// params.roomId === 'general'
}),
// Client sends: { command: "user/123/follow" }
follow: Procedure('user/:userId/follow')
.handle(({ params }) => {
// params.userId === '123'
}),Request Body
Use .body() to validate the command payload:
// Client sends: { command: "room/general/message", payload: { content: "Hi!" } }
message: Procedure('room/:roomId/message')
.body(z.object({
content: z.string().min(1).max(1000),
}))
.handle(({ params, body }) => {
// body.content is validated and typed
console.log(`Message in ${params.roomId}: ${body.content}`);
}),Middleware
Apply middleware to individual procedures:
// Middleware: require room membership
function requireMembership({ session, params }: {
session: ChatSession;
params: { roomId: string };
}) {
if (!session.rooms.has(params.roomId)) {
throw new Error(`Not a member of ${params.roomId}`);
}
return { room: session.rooms.get(params.roomId)! };
}
// Apply to procedure
message: Procedure('room/:roomId/message')
.use(requireMembership) // Adds 'room' to context
.body(z.object({ content: z.string() }))
.handle(({ room, body }) => {
// Can access room from middleware
}),Streaming Responses
Use async generators to stream multiple responses:
// Generator procedure - streams responses
join: Procedure('room/:roomId/join')
.handle(async function*({ session, params }) {
const subscription = services.chat.subscribe(params.roomId);
// Send immediate response
yield { type: 'joined', roomId: params.roomId };
// Stream messages as they arrive
for await (const msg of subscription) {
yield msg; // Each yield sends to client
}
// When subscription ends, generator completes
}),
// Non-generator - single response
leave: Procedure('room/:roomId/leave')
.handle(({ params }) => {
// Single return value sent to client
return { left: params.roomId };
}),Session Lifecycle
The session provides lifecycle hooks for cleanup:
// Create session with using for auto-disposal
using session = services.procedures.createSession(context);
// Register cleanup handler
session.onDispose(() => {
// Called when:
// - Client disconnects
// - Handler returns/throws
// - using block exits
// Clean up subscriptions
for (const [, room] of context.rooms) {
room.subscription.unsubscribe();
}
// Broadcast leave to all rooms
for (const [roomId] of context.rooms) {
services.chat.broadcast(roomId, {
type: 'leave',
username: context.username,
timestamp: Date.now(),
});
}
});
// Run the session
await session.run();Error Handling
Errors in procedures are caught and sent to the client:
message: Procedure('room/:roomId/message')
.handle(({ params, body }) => {
// Throwing sends error to client
if (!body.content.trim()) {
throw new Error('Message cannot be empty');
}
// Validation errors from .body() are also sent
}),
// Client receives:
// { error: 'Message cannot be empty', command: 'room/general/message' }Client Protocol
The client sends commands as JSON objects:
const ws = new WebSocket('ws://localhost:3000/chat?username=alice');
// Join a room
ws.send(JSON.stringify({
command: 'room/general/join',
}));
// Send a message
ws.send(JSON.stringify({
command: 'room/general/message',
payload: { content: 'Hello everyone!' },
}));
// Leave the room
ws.send(JSON.stringify({
command: 'room/general/leave',
}));
// Handle responses
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log('Received:', msg);
};Complete Example
A full chat application with contextual controllers:
import { createController } from '@justscale/core';
import { z } from 'zod';
import { ChatService } from './chat-service';
interface ChatSession {
username: string;
send: (msg: unknown) => void;
rooms: Map<string, { subscription: any }>;
}
export const ChatProcedures = createController
.withContext<ChatSession>()
.create({
inject: { chat: ChatService },
routes: (services, { Procedure }) => ({
join: Procedure('room/:roomId/join')
.handle(async function*({ session, params }) {
const { roomId } = params;
if (session.rooms.has(roomId)) return { error: 'already_joined' };
const sub = services.chat.subscribe(roomId);
session.rooms.set(roomId, { subscription: sub });
services.chat.broadcast(roomId, {
type: 'join', username: session.username, timestamp: Date.now(),
});
yield { type: 'users', users: services.chat.getMembers(roomId) };
for await (const msg of sub) yield msg;
}),
leave: Procedure('room/:roomId/leave')
.handle(({ session, params }) => {
const room = session.rooms.get(params.roomId);
if (!room) return { error: 'not_member' };
room.subscription.unsubscribe();
session.rooms.delete(params.roomId);
services.chat.broadcast(params.roomId, {
type: 'leave', username: session.username, timestamp: Date.now(),
});
return { left: params.roomId };
}),
message: Procedure('room/:roomId/message')
.body(z.object({ content: z.string() }))
.handle(({ session, params, body }) => {
if (!session.rooms.has(params.roomId)) {
throw new Error('Not a member');
}
services.chat.broadcast(params.roomId, {
type: 'message',
username: session.username,
content: body.content,
timestamp: Date.now(),
});
}),
}),
});