WebSocket
Real-time bidirectional communication with WebSocket routes
The @justscale/websocket package provides WebSocket route support using the same controller pattern as HTTP. Messages are handled via async iterables, making it natural to work with streams of data.
Installation
pnpm add @justscale/core @justscale/core/cluster @justscale/websocket zodComplete Example
A real-time chat application with rooms, message broadcasting, and user tracking:
import { createController } from '@justscale/core'
import { Ws } from '@justscale/websocket'
import { ClientMessage } from '../schemas/message'
import type { ChatMessage } from '../models/room'
import { RoomService } from '../services/room-service'
export const ChatController = createController('/chat', {
inject: { rooms: RoomService },
routes: ({ rooms }) => ({
// WebSocket endpoint: ws://localhost:3000/chat/room/general?username=Alice
room: Ws('/room/:roomId')
.message(ClientMessage)
.handle(async ({ messages, send, params, query }) => {
const { roomId } = params
const username = query.username || 'Anonymous'
// Get the room
const room = rooms.getRoom(roomId)
// Subscribe to room messages - forward to this client
const unsubscribe = room.subscribe((msg) => send(msg))
// Announce join
rooms.announceJoin(roomId, username)
try {
// Process incoming messages from this client
for await (const msg of messages) {
const chatMsg: ChatMessage = {
type: msg.type === 'message' ? 'message' : 'typing',
username,
content: msg.content,
timestamp: new Date().toISOString(),
}
// Publish to all subscribers (including self)
room.publish(chatMsg)
}
} finally {
// Client disconnected - cleanup
unsubscribe()
rooms.announceLeave(roomId, username)
}
}),
}),
})The Ws route factory creates WebSocket endpoints. The handler receives an async iterable of messages and a send function for responses.
Message Validation
Use .message() with a Zod schema to validate incoming messages:
import { z } from 'zod'
// Incoming message from client
export const ClientMessage = z.object({
type: z.enum(['message', 'typing']),
content: z.string().optional(),
})
export type ClientMessage = z.infer<typeof ClientMessage>Invalid messages are rejected automatically. The handler only receives validated, typed messages.
Room Management
Services manage shared state across WebSocket connections. This service tracks clients per room and handles broadcasting:
import { defineService } from '@justscale/core'
import type { ChatMessage } from '../models/room'
type MessageHandler = (msg: ChatMessage) => void
interface RoomInstance {
name: string
/** Subscribe to room messages. Returns unsubscribe function. */
subscribe(handler: MessageHandler): () => void
/** Publish a message to all subscribers. */
publish(msg: ChatMessage): void
}
/**
* Room service manages chat rooms with pub/sub messaging.
*
* Each room provides subscribe/publish for real-time messages.
* In production, this would be backed by PostgreSQL LISTEN/NOTIFY
* or Redis pub/sub for multi-instance support.
*/
export class RoomService extends defineService({
inject: {},
factory: () => {
// In-memory room storage with subscriber sets
const rooms = new Map<string, Set<MessageHandler>>()
function getOrCreateRoom(roomId: string): Set<MessageHandler> {
let subscribers = rooms.get(roomId)
if (!subscribers) {
subscribers = new Set()
rooms.set(roomId, subscribers)
}
return subscribers
}
return {
/**
* Get a room instance with subscribe/publish capabilities.
*/
getRoom(roomId: string): RoomInstance {
const subscribers = getOrCreateRoom(roomId)
return {
name: roomId,
subscribe(handler: MessageHandler) {
subscribers.add(handler)
return () => subscribers.delete(handler)
},
publish(msg: ChatMessage) {
for (const handler of subscribers) {
handler(msg)
}
},
}
},
/**
* Announce a user joining.
*/
announceJoin(roomId: string, username: string): void {
const room = this.getRoom(roomId)
room.publish({
type: 'joined',
username,
content: undefined,
timestamp: new Date().toISOString(),
})
},
/**
* Announce a user leaving.
*/
announceLeave(roomId: string, username: string): void {
const room = this.getRoom(roomId)
room.publish({
type: 'left',
username,
content: undefined,
timestamp: new Date().toISOString(),
})
// Cleanup empty rooms
const subscribers = rooms.get(roomId)
if (subscribers && subscribers.size === 0) {
rooms.delete(roomId)
}
},
}
},
}) {}Server Setup
WebSocket routes work with @justscale/core/cluster. WebSocket connections share the same HTTP port:
import JustScale from '@justscale/core'
import { ChatController } from './controllers/chat'
import { RoomService } from './services/room-service'
const app = JustScale()
.add(RoomService)
.add(ChatController)
.build()
await app.serve({ http: 3000 })
console.log('Chat server running on http://localhost:3000')
console.log(
'WebSocket: ws://localhost:3000/chat/room/:roomId?username=YourName',
)Handler Context
WebSocket handlers receive a context object with:
- messages - Async iterable of incoming messages (typed via
.message()) - send - Function to send messages to the client
- params - URL path parameters (e.g.,
roomId) - query - URL query parameters (e.g.,
username) - Middleware context - Properties added by
.use()
Using the Chat
Connect with any WebSocket client:
// Browser
const ws = new WebSocket('ws://localhost:3000/chat/room/general?username=Alice');
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log(`[${msg.username}]: ${msg.content}`);
};
// Send a message
ws.send(JSON.stringify({ type: 'message', content: 'Hello everyone!' }));
// Typing indicator
ws.send(JSON.stringify({ type: 'typing' }));With Authentication
Chain requireAuth guard to protect WebSocket endpoints:
import { Ws } from '@justscale/websocket';
import { requireAuth } from '@justscale/auth';
Ws('/chat/:roomId')
.guard(requireAuth) // Validates token from query param or header
.message(MessageSchema)
.handle(async ({ messages, send, user }) => {
// user is authenticated
for await (const msg of messages) {
send({ from: user.email, ...msg });
}
});Tokens can be passed via Authorization header or ?token= query parameter.