Rooms & Broadcasting
Implement real-time rooms with pub/sub channels
Real-time applications often need to group connections into "rooms" and broadcast messages to all members. JustScale combines @justscale/corewith WebSocket for scalable, cluster-aware rooms.
Room Architecture
The recommended pattern uses channels as the pub/sub layer:
- Channels - Handle message distribution (pub/sub)
- Service - Business logic, room state, user tracking
- Controller - WebSocket endpoints, connect channels to clients
import { defineModel, field } from '@justscale/core/models';
// The domain entity rooms are keyed on — services and channels
// take `Ref<Room>`, never a raw string ID.
export class Room extends defineModel({
name: 'Room',
fields: {
name: field.string().max(100),
},
}) {}Basic Room Implementation
A simple chat room that joins users and broadcasts messages:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
import { ChatService } from './chat-service';
import { Room } from './room.model';
const MessageSchema = z.object({
type: z.enum(['message', 'typing']),
content: z.string().optional(),
});
export const ChatController = createController('/chat', {
inject: { chat: ChatService },
routes: (services) => ({
room: Ws('/room/:roomId')
.message(MessageSchema)
.handle(async ({ messages, send, params, query }) => {
// Boundary: raw path param -> typed reference.
const room = Room.ref`${params.roomId}`;
const username = query.username || 'Anonymous';
// Subscribe to room events
const subscription = services.chat.subscribe(room);
// Broadcast join
services.chat.broadcast(room, {
type: 'join',
username,
timestamp: Date.now(),
});
// Forward room messages to client (concurrent)
const forwardMessages = async () => {
for await (const msg of subscription) {
send(msg);
}
};
// Handle client messages
const handleMessages = async () => {
for await (const msg of messages) {
services.chat.broadcast(room, {
type: msg.type,
username,
content: msg.content,
timestamp: Date.now(),
});
}
};
// Run both concurrently until one exits
await Promise.race([forwardMessages(), handleMessages()]);
// Cleanup: unsubscribe and announce leave
subscription.unsubscribe();
services.chat.broadcast(room, {
type: 'leave',
username,
timestamp: Date.now(),
});
}),
}),
});Tracking Room Members
To show who's online, track members in the service:
import { defineService } from '@justscale/core';
import type { Ref } from '@justscale/core/models';
import { RoomChannels, type RoomMessage } from './room-channels';
import { Room } from './room.model';
interface RoomMember {
username: string;
joinedAt: number;
}
export class ChatService extends defineService({
inject: { channels: RoomChannels },
factory: ({ channels }) => {
// Track members per room (this node only). Keyed by the room's
// identifier so we can hash in a Map; domain callers still pass Ref<Room>.
const members = new Map<string, Map<string, RoomMember>>();
return {
subscribe: (room: Ref<Room>) => channels.subscribe(room),
broadcast: (room: Ref<Room>, msg: RoomMessage) => channels.publish(room, msg),
join(room: Ref<Room>, username: string): Disposable {
const key = room.identifier;
if (!members.has(key)) {
members.set(key, new Map());
}
members.get(key)!.set(username, {
username,
joinedAt: Date.now(),
});
return {
[Symbol.dispose]: () => {
members.get(key)?.delete(username);
if (members.get(key)?.size === 0) {
members.delete(key);
}
},
};
},
getMembers(room: Ref<Room>): RoomMember[] {
return Array.from(members.get(room.identifier)?.values() ?? []);
},
};
},
}) {}Private Messages
For direct messages between users, create user-specific channels:
import { createChannels, defineService } from '@justscale/core';
import type { Ref } from '@justscale/core/models';
import { User } from './user.model';
interface DirectMessage {
from: Ref<User>;
content: string;
timestamp: number;
}
// Channel per user for their DMs — keyed by `Ref<User>` via prefix.
const UserChannels = createChannels<DirectMessage>({ prefix: 'user:' });
export class DMService extends defineService({
inject: { channels: UserChannels },
factory: ({ channels }) => ({
// Subscribe to your own DM channel
subscribeToMessages(user: Ref<User>) {
return channels.subscribe(user);
},
// Send DM to another user
sendMessage(to: Ref<User>, from: Ref<User>, content: string) {
channels.publish(to, {
from,
content,
timestamp: Date.now(),
});
},
}),
}) {}Cluster-Aware Rooms
For multi-node deployments, use channel hooks to broadcast across the cluster via the event bus:
import { createChannels } from '@justscale/core';
import { createEventBus } from '@justscale/event';
import { z } from 'zod';
import { Room } from './room.model';
import type { RoomMessage } from './types';
// Event bus for cross-cluster messaging. The wire schema carries the
// room's identifier as a string — references get rebuilt at the
// boundary (hook / handler) with `Room.ref`${...}``.
const RoomPayload = z.object({
roomKey: z.string(),
type: z.enum(['message', 'join', 'leave']),
// ...other RoomMessage fields
});
export const ChatEvents = createEventBus({
'room.message': RoomPayload,
'room.join': RoomPayload,
'room.leave': RoomPayload,
});
// Channels with cluster hooks. The hook is the infra boundary — pass
// the raw key across the wire, rebuild a Ref<Room> on the other side.
export const RoomChannels = createChannels<RoomMessage>().withHooks({
onPublish: (key, msg) => {
ChatEvents.emit(`room.${msg.type}`, { roomKey: key, ...msg });
},
});Tip
deliverRemote() delivers to local subscribers without triggering the onPublish hook, preventing infinite loops.Room Presence
For presence features (online status, last seen), combine member tracking with periodic heartbeats:
import { defineService } from '@justscale/core';
import type { Ref } from '@justscale/core/models';
import { User } from './user.model';
interface PresenceInfo {
status: 'online' | 'away' | 'busy';
lastSeen: number;
}
export class PresenceService extends defineService({
inject: {},
factory: () => {
// Keyed by user identifier so we can Map.get; callers pass Ref<User>.
const presence = new Map<string, PresenceInfo>();
return {
setPresence(user: Ref<User>, status: PresenceInfo['status']) {
presence.set(user.identifier, {
status,
lastSeen: Date.now(),
});
},
heartbeat(user: Ref<User>) {
const info = presence.get(user.identifier);
if (info) {
info.lastSeen = Date.now();
}
},
getPresence(user: Ref<User>): PresenceInfo | null {
return presence.get(user.identifier) ?? null;
},
removePresence(user: Ref<User>) {
presence.delete(user.identifier);
},
// Clean up stale entries (call periodically)
cleanup(maxAge = 60000) {
const now = Date.now();
for (const [key, info] of presence) {
if (now - info.lastSeen > maxAge) {
presence.delete(key);
}
}
},
};
},
}) {}Excluding Sender
Sometimes you don't want to echo messages back to the sender. Track the sender's subscription and filter:
// Option 1: Filter in the forwarding loop
const forwardMessages = async () => {
for await (const msg of subscription) {
// Don't echo own messages
if (msg.username !== username) {
send(msg);
}
}
};
// Option 2: Send directly to sender, broadcast to others.
// `room` here is the Ref<Room> built at the controller boundary.
services.chat.broadcast(room, {
type: 'message',
username,
content: msg.content,
timestamp: Date.now(),
});
// Send confirmation to sender only
send({ type: 'sent', messageId: generateId() });