Rooms & Broadcasting
Implement real-time rooms with pub/sub channels
Real-time applications often need to group connections into "rooms" and broadcast messages to all members. JustScale combines @justscale/channelwith WebSocket for scalable, cluster-aware rooms.
Room Architecture
The recommended pattern uses channels as the pub/sub layer:
- Channels - Handle message distribution (pub/sub)
- Service - Business logic, room state, user tracking
- Controller - WebSocket endpoints, connect channels to clients
import { createChannels } from '@justscale/channel';
// Message type for room events
export interface RoomMessage {
type: 'message' | 'join' | 'leave' | 'typing';
username: string;
content?: string;
timestamp: number;
}
// Create channels service
export const RoomChannels = createChannels<RoomMessage>();Basic Room Implementation
A simple chat room that joins users and broadcasts messages:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { z } from 'zod';
import { ChatService } from './chat-service';
const MessageSchema = z.object({
type: z.enum(['message', 'typing']),
content: z.string().optional(),
});
export const ChatController = createController('/chat', {
inject: { chat: ChatService },
routes: (services) => ({
room: Ws('/room/:roomId')
.message(MessageSchema)
.handle(async ({ messages, send, params, query }) => {
const { roomId } = params;
const username = query.username || 'Anonymous';
// Subscribe to room events
const subscription = services.chat.subscribe(roomId);
// Broadcast join
services.chat.broadcast(roomId, {
type: 'join',
username,
timestamp: Date.now(),
});
// Forward room messages to client (concurrent)
const forwardMessages = async () => {
for await (const msg of subscription) {
send(msg);
}
};
// Handle client messages
const handleMessages = async () => {
for await (const msg of messages) {
services.chat.broadcast(roomId, {
type: msg.type,
username,
content: msg.content,
timestamp: Date.now(),
});
}
};
// Run both concurrently until one exits
await Promise.race([forwardMessages(), handleMessages()]);
// Cleanup: unsubscribe and announce leave
subscription.unsubscribe();
services.chat.broadcast(roomId, {
type: 'leave',
username,
timestamp: Date.now(),
});
}),
}),
});Tracking Room Members
To show who's online, track members in the service:
import { createService } from '@justscale/core';
import { RoomChannels, type RoomMessage } from './room-channels';
interface RoomMember {
username: string;
joinedAt: number;
}
export const ChatService = createService({
inject: { channels: RoomChannels },
factory: ({ channels }) => {
// Track members per room (this node only)
const members = new Map<string, Map<string, RoomMember>>();
return {
subscribe: (roomId: string) => channels.subscribe(roomId),
broadcast: (roomId: string, msg: RoomMessage) => channels.publish(roomId, msg),
join(roomId: string, username: string): Disposable {
if (!members.has(roomId)) {
members.set(roomId, new Map());
}
members.get(roomId)!.set(username, {
username,
joinedAt: Date.now(),
});
return {
[Symbol.dispose]: () => {
members.get(roomId)?.delete(username);
if (members.get(roomId)?.size === 0) {
members.delete(roomId);
}
},
};
},
getMembers(roomId: string): RoomMember[] {
return Array.from(members.get(roomId)?.values() ?? []);
},
};
},
});Private Messages
For direct messages between users, create user-specific channels:
import { createChannels } from '@justscale/channel';
import { createService } from '@justscale/core';
interface DirectMessage {
from: string;
content: string;
timestamp: number;
}
// Channel per user for their DMs
const UserChannels = createChannels<DirectMessage>();
export const DMService = createService({
inject: { channels: UserChannels },
factory: ({ channels }) => ({
// Subscribe to your own DM channel
subscribeToMessages(userId: string) {
return channels.subscribe(`user:${userId}`);
},
// Send DM to another user
sendMessage(toUserId: string, from: string, content: string) {
channels.publish(`user:${toUserId}`, {
from,
content,
timestamp: Date.now(),
});
},
}),
});Cluster-Aware Rooms
For multi-node deployments, use channel hooks to broadcast across the cluster via the event bus:
import { createChannels } from '@justscale/channel';
import { createEvents } from '@justscale/event';
import type { RoomMessage } from './types';
// Event bus for cross-cluster messaging
export const ChatEvents = createEvents<{
'room.message': { roomId: string } & RoomMessage;
'room.join': { roomId: string } & RoomMessage;
'room.leave': { roomId: string } & RoomMessage;
}>();
// Channels with cluster hooks
export const RoomChannels = createChannels<RoomMessage>().withHooks({
onPublish: (roomId, msg) => {
// Broadcast to other cluster nodes
ChatEvents.emit(`room.${msg.type}`, { roomId, ...msg });
},
});Tip
deliverRemote() delivers to local subscribers without triggering the onPublish hook, preventing infinite loops.Room Presence
For presence features (online status, last seen), combine member tracking with periodic heartbeats:
import { createService } from '@justscale/core';
interface PresenceInfo {
status: 'online' | 'away' | 'busy';
lastSeen: number;
}
export const PresenceService = createService({
inject: {},
factory: () => {
const presence = new Map<string, PresenceInfo>();
return {
setPresence(userId: string, status: PresenceInfo['status']) {
presence.set(userId, {
status,
lastSeen: Date.now(),
});
},
heartbeat(userId: string) {
const info = presence.get(userId);
if (info) {
info.lastSeen = Date.now();
}
},
getPresence(userId: string): PresenceInfo | null {
return presence.get(userId) ?? null;
},
removePresence(userId: string) {
presence.delete(userId);
},
// Clean up stale entries (call periodically)
cleanup(maxAge = 60000) {
const now = Date.now();
for (const [userId, info] of presence) {
if (now - info.lastSeen > maxAge) {
presence.delete(userId);
}
}
},
};
},
});Excluding Sender
Sometimes you don't want to echo messages back to the sender. Track the sender's subscription and filter:
// Option 1: Filter in the forwarding loop
const forwardMessages = async () => {
for await (const msg of subscription) {
// Don't echo own messages
if (msg.username !== username) {
send(msg);
}
}
};
// Option 2: Send directly to sender, broadcast to others
services.chat.broadcast(roomId, {
type: 'message',
username,
content: msg.content,
timestamp: Date.now(),
});
// Send confirmation to sender only
send({ type: 'sent', messageId: generateId() });