WebSocket
Real-time bidirectional communication with WebSocket routes
The @justscale/websocket package provides WebSocket route support using the same controller pattern as HTTP. Messages are handled via async iterables, making it natural to work with streams of data.
Installation
pnpm add @justscale/core @justscale/cluster @justscale/websocket zodComplete Example
A real-time chat application with rooms, message broadcasting, and user tracking:
import { createController } from '@justscale/core'
import { Ws } from '@justscale/websocket'
import { ClientMessage, type ServerMessage } from '../schemas/message'
import { RoomService } from '../services/room-service'
export const ChatController = createController('/chat', {
inject: { rooms: RoomService },
routes: ({ rooms }) => ({
// WebSocket endpoint: ws://localhost:3000/chat/room/general?username=Alice
room: Ws('/room/:roomId')
.message(ClientMessage)
.handle(async ({ messages, send, params, query }) => {
const { roomId } = params
const username = query.username || 'Anonymous'
const clientId = crypto.randomUUID()
// Join the room
const room = rooms.join(roomId, clientId, username, send)
// Send current users to the new client
send({
type: 'joined',
username: 'System',
content: `Users in room: ${rooms.getClients(roomId).join(', ')}`,
timestamp: new Date().toISOString(),
})
// Process incoming messages
for await (const msg of messages) {
const serverMsg: ServerMessage = {
type: msg.type === 'message' ? 'message' : 'typing',
username,
content: msg.content,
timestamp: new Date().toISOString(),
}
// Broadcast to all clients in the room
room.broadcast(serverMsg)
// Echo back to sender
send(serverMsg)
}
// Client disconnected - leave room
room.leave()
}),
}),
})The Ws route factory creates WebSocket endpoints. The handler receives an async iterable of messages and a send function for responses.
Message Validation
Use .message() with a Zod schema to validate incoming messages:
import { z } from 'zod'
// Incoming message from client
export const ClientMessage = z.object({
type: z.enum(['message', 'typing', 'join', 'leave']),
content: z.string().optional(),
})
export type ClientMessage = z.infer<typeof ClientMessage>
// Outgoing message to client
export interface ServerMessage {
type: 'message' | 'typing' | 'joined' | 'left' | 'error'
username: string
content?: string
timestamp: string
}Invalid messages are rejected automatically. The handler only receives validated, typed messages.
Room Management
Services manage shared state across WebSocket connections. This service tracks clients per room and handles broadcasting:
import { createService } from '@justscale/core'
import type { ServerMessage } from '../schemas/message'
type SendFn = (msg: ServerMessage) => void
interface Client {
username: string
send: SendFn
}
export const RoomService = createService({
inject: {},
factory: () => {
const rooms = new Map<string, Map<string, Client>>()
return {
join(roomId: string, clientId: string, username: string, send: SendFn) {
if (!rooms.has(roomId)) {
rooms.set(roomId, new Map())
}
const room = rooms.get(roomId)!
room.set(clientId, { username, send })
// Notify others
this.broadcast(roomId, clientId, {
type: 'joined',
username,
timestamp: new Date().toISOString(),
})
return {
broadcast: (msg: ServerMessage) =>
this.broadcast(roomId, clientId, msg),
leave: () => this.leave(roomId, clientId, username),
}
},
leave(roomId: string, clientId: string, username: string) {
const room = rooms.get(roomId)
if (!room) return
room.delete(clientId)
// Notify others
this.broadcast(roomId, clientId, {
type: 'left',
username,
timestamp: new Date().toISOString(),
})
// Cleanup empty room
if (room.size === 0) {
rooms.delete(roomId)
}
},
broadcast(roomId: string, excludeId: string, msg: ServerMessage) {
const room = rooms.get(roomId)
if (!room) return
for (const [id, client] of room) {
if (id !== excludeId) {
client.send(msg)
}
}
},
getClients(roomId: string): string[] {
const room = rooms.get(roomId)
return room ? Array.from(room.values()).map((c) => c.username) : []
},
}
},
})Server Setup
WebSocket routes work with @justscale/cluster. WebSocket connections share the same HTTP port:
import { createClusterBuilder, wrapWithCluster } from '@justscale/cluster'
import { ChatController } from './controllers/chat'
import { RoomService } from './services/room-service'
const built = createClusterBuilder()
.add(RoomService)
.add(ChatController)
.build()
const cluster = wrapWithCluster(built.compile())
await cluster.serve({ http: 3000 })
console.log('Chat server running on http://localhost:3000')
console.log(
'WebSocket: ws://localhost:3000/chat/room/:roomId?username=YourName',
)Handler Context
WebSocket handlers receive a context object with:
- messages - Async iterable of incoming messages (typed via
.message()) - send - Function to send messages to the client
- params - URL path parameters (e.g.,
roomId) - query - URL query parameters (e.g.,
username) - Middleware context - Properties added by
.use()
Using the Chat
Connect with any WebSocket client:
// Browser
const ws = new WebSocket('ws://localhost:3000/chat/room/general?username=Alice');
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log(`[${msg.username}]: ${msg.content}`);
};
// Send a message
ws.send(JSON.stringify({ type: 'message', content: 'Hello everyone!' }));
// Typing indicator
ws.send(JSON.stringify({ type: 'typing' }));With Authentication
Chain requireAuth guard to protect WebSocket endpoints:
import { Ws } from '@justscale/websocket';
import { requireAuth } from '@justscale/auth';
Ws('/chat/:roomId')
.guard(requireAuth) // Validates token from query param or header
.message(MessageSchema)
.handle(async ({ messages, send, user }) => {
// user is authenticated
for await (const msg of messages) {
send({ from: user.email, ...msg });
}
});Tokens can be passed via Authorization header or ?token= query parameter.