Message Handling
Validate, process, and respond to WebSocket messages
WebSocket handlers receive messages through an async iterable, making it natural to process streams of data. This guide covers message validation, processing patterns, and error handling.
The Message Loop
Every WebSocket handler receives a messages async iterable. Use for await to process messages as they arrive:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
export const EchoController = createController('/echo', {
routes: () => ({
ws: Ws('/').handle(async ({ messages, send }) => {
// Loop runs until client disconnects
for await (const message of messages) {
console.log('Received:', message);
send({ echo: message });
}
// Loop exits when connection closes
console.log('Client disconnected');
}),
}),
});Info
messages iterable yields parsed JSON objects. Raw string messages are automatically parsed before reaching your handler.Message Validation
Use the .message() builder to validate incoming messages with a Zod schema. Invalid messages are silently dropped:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
// Define message schema
const ChatMessage = z.object({
type: z.enum(['message', 'typing', 'ping']),
content: z.string().optional(),
roomId: z.string(),
});
export const ChatController = createController('/chat', {
routes: () => ({
ws: Ws('/')
.message(ChatMessage) // Validates all messages
.handle(async ({ messages, send }) => {
for await (const msg of messages) {
// msg is typed: { type: 'message' | 'typing' | 'ping', ... }
switch (msg.type) {
case 'message':
send({ received: msg.content });
break;
case 'typing':
// Broadcast typing indicator
break;
case 'ping':
send({ type: 'pong' });
break;
}
}
}),
}),
});Custom Validation Errors
By default, invalid messages are dropped. To send validation errors to the client, handle validation manually:
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
const MessageSchema = z.object({
type: z.string(),
payload: z.unknown(),
});
Ws('/').handle(async ({ messages, send }) => {
for await (const raw of messages) {
const result = MessageSchema.safeParse(raw);
if (!result.success) {
send({
error: 'validation_failed',
details: result.error.flatten(),
});
continue;
}
// Process validated message
handleMessage(result.data);
}
});Discriminated Message Types
For protocols with multiple message types, use Zod's discriminated union to get type-safe handling:
import { z } from 'zod';
import { Ws } from '@justscale/websocket';
// Define each message type
const JoinMessage = z.object({
type: z.literal('join'),
roomId: z.string(),
username: z.string(),
});
const LeaveMessage = z.object({
type: z.literal('leave'),
roomId: z.string(),
});
const ChatMessage = z.object({
type: z.literal('chat'),
roomId: z.string(),
content: z.string(),
});
// Combine with discriminated union
const ClientMessage = z.discriminatedUnion('type', [
JoinMessage,
LeaveMessage,
ChatMessage,
]);
Ws('/')
.message(ClientMessage)
.handle(async ({ messages, send }) => {
for await (const msg of messages) {
// TypeScript knows the exact type based on msg.type
switch (msg.type) {
case 'join':
// msg is { type: 'join', roomId: string, username: string }
console.log(`${msg.username} joining ${msg.roomId}`);
break;
case 'leave':
// msg is { type: 'leave', roomId: string }
console.log(`User left ${msg.roomId}`);
break;
case 'chat':
// msg is { type: 'chat', roomId: string, content: string }
send({ echo: msg.content });
break;
}
}
});Sending Messages
The send() function sends data to the connected client. Objects are automatically serialized to JSON:
Ws('/').handle(async ({ messages, send }) => {
// Send object (serialized to JSON)
send({ type: 'welcome', timestamp: Date.now() });
// Send array
send([1, 2, 3]);
// Send string (sent as-is)
send('Hello');
for await (const msg of messages) {
// Reply to each message
send({ received: true, id: msg.id });
}
});Error Handling
Errors inside the message loop close the connection. Wrap individual message processing in try-catch to keep the connection alive:
import { Ws } from '@justscale/websocket';
Ws('/').handle(async ({ messages, send, logger }) => {
try {
for await (const msg of messages) {
try {
// Process message - errors here don't close connection
const result = await processMessage(msg);
send({ success: true, result });
} catch (error) {
// Send error to client
send({
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
});
logger.warn('Message processing failed', { error });
}
}
} catch (error) {
// Connection-level error (network issues, etc.)
logger.error('WebSocket connection error', { error });
} finally {
// Always runs on disconnect
logger.info('Client disconnected');
}
});Connection Lifecycle
The handler context provides lifecycle controls:
closed- Promise that resolves when connection closesclose(code?, reason?)- Gracefully close the connection
Ws('/').handle(async ({ messages, send, close, closed }) => {
// Set up timeout - close if no messages for 30 seconds
let timeout: NodeJS.Timeout;
const resetTimeout = () => {
clearTimeout(timeout);
timeout = setTimeout(() => {
close(4000, 'Idle timeout');
}, 30000);
};
resetTimeout();
// Listen for close event
closed.then(() => {
clearTimeout(timeout);
console.log('Connection closed');
});
for await (const msg of messages) {
resetTimeout(); // Reset on each message
if (msg.type === 'quit') {
close(1000, 'Client requested close');
break;
}
send({ received: msg });
}
});Query Parameters
Access URL query parameters via the query object:
// Client connects to: ws://localhost:3000/chat?token=abc&room=general
Ws('/chat').handle(async ({ messages, send, query }) => {
// Access query parameters
const token = query.token; // 'abc'
const room = query.room; // 'general'
// Validate token
if (!isValidToken(token)) {
send({ error: 'Invalid token' });
return; // Closes connection
}
send({ joined: room });
for await (const msg of messages) {
// ...
}
});Binary Messages
For binary data, messages arrive as ArrayBuffer. Check the type before processing:
Ws('/binary').handle(async ({ messages, send }) => {
for await (const msg of messages) {
if (msg instanceof ArrayBuffer) {
// Handle binary data
const bytes = new Uint8Array(msg);
console.log('Received', bytes.length, 'bytes');
// Echo back
send(msg);
} else {
// Handle JSON message
send({ received: msg });
}
}
});