Channels
Typed pub/sub messaging for real-time features
The @justscale/channel package provides a typed pub/sub system for building real-time features. Channels are the foundation for WebSocket broadcasting, event streaming, and cluster-aware messaging.
Installation
npm install @justscale/channelBasic Usage
Create a channels service with createChannels<TMessage>()and inject it into your services:
import { createChannels } from '@justscale/channel';
// Define message type for type-safe pub/sub
interface RoomMessage {
type: 'message' | 'join' | 'leave';
username: string;
content?: string;
timestamp: number;
}
// Create typed channels
export const RoomChannels = createChannels<RoomMessage>();Subscribing to Channels
The subscribe() method returns an async iterable that yields messages as they arrive. The subscription automatically cleans up when the loop exits:
// Subscribe returns AsyncIterable + Disposable
const subscription = channels.subscribe('room-123');
// Stream messages with for-await
for await (const msg of subscription) {
console.log(msg.type, msg.username);
// Break to unsubscribe
if (msg.type === 'leave') break;
}
// Subscription automatically cleaned up
// Or manually unsubscribe
subscription.unsubscribe();Subscriptions implement Disposable, so you can use the using keyword for automatic cleanup:
async function handleConnection(roomId: string) {
// Subscription auto-disposes when function exits
using subscription = channels.subscribe(roomId);
for await (const msg of subscription) {
client.send(msg);
}
}Publishing Messages
Use publish() to broadcast messages to all subscribers of a channel:
// Broadcast to all subscribers of 'room-123'
channels.publish('room-123', {
type: 'message',
username: 'alice',
content: 'Hello everyone!',
timestamp: Date.now(),
});
// Only subscribers receive the message
// No subscribers? Message is dropped (fire-and-forget)Channel API
The channels instance provides these methods:
subscribe(key)- Subscribe to a channel, returns async iterablepublish(key, msg)- Broadcast to all local subscribersdeliverRemote(key, msg)- Deliver message from remote node (for clusters)hasSubscribers(key)- Check if channel has any subscribersgetActiveChannels()- List all channels with subscribers
Cluster Integration
For multi-node deployments, use hooks to synchronize channels across the cluster. Hooks are called when subscription state changes:
import { createChannels } from '@justscale/channel';
import { ChatEvents } from './events';
export const RoomChannels = createChannels<RoomMessage>().withHooks({
// Called when first subscriber joins on this node
onFirstSubscriber: (roomId) => {
console.log(`First subscriber to ${roomId}`);
// Register interest at cluster level
},
// Called when last subscriber leaves on this node
onLastUnsubscribe: (roomId) => {
console.log(`No more subscribers to ${roomId}`);
// Unregister from cluster
},
// Called when publishing locally
onPublish: (roomId, msg) => {
// Broadcast to other cluster nodes via event bus
ChatEvents.emit(`room.${msg.type}`, { roomId, ...msg });
},
});Handling Remote Messages
Use deliverRemote() to deliver messages from other cluster nodes. This only delivers to local subscribers without re-triggering the onPublish hook:
import { createController } from '@justscale/core';
import { ChatEvents } from './events';
import { RoomChannels } from './room-channels';
export const ChatEventController = createController('/', {
inject: { channels: RoomChannels },
routes: (services) => ({
// Handle events from other cluster nodes
onRoomEvent: ChatEvents.on('room.*').handle(({ payload }) => {
const { roomId, ...message } = payload;
// Deliver to local subscribers only
services.channels.deliverRemote(roomId, message);
}),
}),
});With WebSocket
Channels integrate naturally with WebSocket for real-time streaming:
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { ChatService } from './chat-service';
export const ChatController = createController('/chat', {
inject: { chat: ChatService },
routes: (services) => ({
room: Ws('/room/:roomId').handle(async ({ messages, send, params }) => {
// Subscribe to room messages
const subscription = services.chat.subscribe(params.roomId);
// Forward channel messages to WebSocket client
// This runs concurrently with message handling
const streamMessages = async () => {
for await (const msg of subscription) {
send(msg);
}
};
// Handle incoming WebSocket messages
const handleMessages = async () => {
for await (const msg of messages) {
services.chat.broadcast(params.roomId, {
type: 'message',
username: msg.username,
content: msg.content,
timestamp: Date.now(),
});
}
};
// Run both concurrently
await Promise.race([streamMessages(), handleMessages()]);
// Cleanup subscription
subscription.unsubscribe();
}),
}),
});Tip
Type Safety
Channels are fully typed. The message type flows through to subscriptions:
interface ChatMessage {
type: 'text' | 'image' | 'system';
sender: string;
content: string;
}
const ChatChannels = createChannels<ChatMessage>();
// Subscription is typed
const sub = channels.subscribe('room-1');
for await (const msg of sub) {
// msg is ChatMessage
if (msg.type === 'text') {
console.log(msg.sender, msg.content);
}
}
// Publish requires correct type
channels.publish('room-1', {
type: 'text',
sender: 'alice',
content: 'Hello!',
});