WebSocket

Real-time bidirectional communication with WebSocket routes

The @justscale/websocket package provides WebSocket route support using the same controller pattern as HTTP. Messages are handled via async iterables, making it natural to work with streams of data.

Installation

Bash
pnpm add @justscale/core @justscale/core/cluster @justscale/websocket zod

Complete Example

A real-time chat application with rooms, message broadcasting, and user tracking:

Files
src/controllers/chat.tsTypeScript
import { createController } from '@justscale/core'
import { Ws } from '@justscale/websocket'
import { ClientMessage } from '../schemas/message'
import type { ChatMessage } from '../models/room'
import { RoomService } from '../services/room-service'

export const ChatController = createController('/chat', {
  inject: { rooms: RoomService },

  routes: ({ rooms }) => ({
    // WebSocket endpoint: ws://localhost:3000/chat/room/general?username=Alice
    room: Ws('/room/:roomId')
      .message(ClientMessage)
      .handle(async ({ messages, send, params, query }) => {
        const { roomId } = params
        const username = query.username || 'Anonymous'

        // Get the room
        const room = rooms.getRoom(roomId)

        // Subscribe to room messages - forward to this client
        const unsubscribe = room.subscribe((msg) => send(msg))

        // Announce join
        rooms.announceJoin(roomId, username)

        try {
          // Process incoming messages from this client
          for await (const msg of messages) {
            const chatMsg: ChatMessage = {
              type: msg.type === 'message' ? 'message' : 'typing',
              username,
              content: msg.content,
              timestamp: new Date().toISOString(),
            }

            // Publish to all subscribers (including self)
            room.publish(chatMsg)
          }
        } finally {
          // Client disconnected - cleanup
          unsubscribe()
          rooms.announceLeave(roomId, username)
        }
      }),
  }),
})

The Ws route factory creates WebSocket endpoints. The handler receives an async iterable of messages and a send function for responses.

Message Validation

Use .message() with a Zod schema to validate incoming messages:

Files
src/schemas/message.tsTypeScript
import { z } from 'zod'

// Incoming message from client
export const ClientMessage = z.object({
  type: z.enum(['message', 'typing']),
  content: z.string().optional(),
})

export type ClientMessage = z.infer<typeof ClientMessage>

Invalid messages are rejected automatically. The handler only receives validated, typed messages.

Room Management

Services manage shared state across WebSocket connections. This service tracks clients per room and handles broadcasting:

Files
src/services/room-service.tsTypeScript
import { defineService } from '@justscale/core'
import type { ChatMessage } from '../models/room'

type MessageHandler = (msg: ChatMessage) => void

interface RoomInstance {
  name: string
  /** Subscribe to room messages. Returns unsubscribe function. */
  subscribe(handler: MessageHandler): () => void
  /** Publish a message to all subscribers. */
  publish(msg: ChatMessage): void
}

/**
 * Room service manages chat rooms with pub/sub messaging.
 *
 * Each room provides subscribe/publish for real-time messages.
 * In production, this would be backed by PostgreSQL LISTEN/NOTIFY
 * or Redis pub/sub for multi-instance support.
 */
export class RoomService extends defineService({
  inject: {},
  factory: () => {
    // In-memory room storage with subscriber sets
    const rooms = new Map<string, Set<MessageHandler>>()

    function getOrCreateRoom(roomId: string): Set<MessageHandler> {
      let subscribers = rooms.get(roomId)
      if (!subscribers) {
        subscribers = new Set()
        rooms.set(roomId, subscribers)
      }
      return subscribers
    }

    return {
      /**
       * Get a room instance with subscribe/publish capabilities.
       */
      getRoom(roomId: string): RoomInstance {
        const subscribers = getOrCreateRoom(roomId)

        return {
          name: roomId,

          subscribe(handler: MessageHandler) {
            subscribers.add(handler)
            return () => subscribers.delete(handler)
          },

          publish(msg: ChatMessage) {
            for (const handler of subscribers) {
              handler(msg)
            }
          },
        }
      },

      /**
       * Announce a user joining.
       */
      announceJoin(roomId: string, username: string): void {
        const room = this.getRoom(roomId)
        room.publish({
          type: 'joined',
          username,
          content: undefined,
          timestamp: new Date().toISOString(),
        })
      },

      /**
       * Announce a user leaving.
       */
      announceLeave(roomId: string, username: string): void {
        const room = this.getRoom(roomId)
        room.publish({
          type: 'left',
          username,
          content: undefined,
          timestamp: new Date().toISOString(),
        })

        // Cleanup empty rooms
        const subscribers = rooms.get(roomId)
        if (subscribers && subscribers.size === 0) {
          rooms.delete(roomId)
        }
      },
    }
  },
}) {}

Server Setup

WebSocket routes work with @justscale/core/cluster. WebSocket connections share the same HTTP port:

Files
src/index.tsTypeScript
import JustScale from '@justscale/core'
import { ChatController } from './controllers/chat'
import { RoomService } from './services/room-service'

const app = JustScale()
  .add(RoomService)
  .add(ChatController)
  .build()

await app.serve({ http: 3000 })
console.log('Chat server running on http://localhost:3000')
console.log(
  'WebSocket: ws://localhost:3000/chat/room/:roomId?username=YourName',
)

Handler Context

WebSocket handlers receive a context object with:

  • messages - Async iterable of incoming messages (typed via .message())
  • send - Function to send messages to the client
  • params - URL path parameters (e.g., roomId)
  • query - URL query parameters (e.g., username)
  • Middleware context - Properties added by .use()

Using the Chat

Connect with any WebSocket client:

JavaScript
// Browser
const ws = new WebSocket('ws://localhost:3000/chat/room/general?username=Alice');

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  console.log(`[${msg.username}]: ${msg.content}`);
};

// Send a message
ws.send(JSON.stringify({ type: 'message', content: 'Hello everyone!' }));

// Typing indicator
ws.send(JSON.stringify({ type: 'typing' }));

With Authentication

Chain requireAuth guard to protect WebSocket endpoints:

TypeScript
import { Ws } from '@justscale/websocket';
import { requireAuth } from '@justscale/auth';

Ws('/chat/:roomId')
  .guard(requireAuth)  // Validates token from query param or header
  .message(MessageSchema)
  .handle(async ({ messages, send, user }) => {
    // user is authenticated
    for await (const msg of messages) {
      send({ from: user.email, ...msg });
    }
  });

Tokens can be passed via Authorization header or ?token= query parameter.