Channels

Typed pub/sub messaging for real-time features

The @justscale/channel package provides a typed pub/sub system for building real-time features. Channels are the foundation for WebSocket broadcasting, event streaming, and cluster-aware messaging.

Installation

Bash
npm install @justscale/channel

Basic Usage

Create a channels service with createChannels<TMessage>()and inject it into your services:

Files
room-channels.tsTypeScript
import { createChannels } from '@justscale/channel';

// Define message type for type-safe pub/sub
interface RoomMessage {
  type: 'message' | 'join' | 'leave';
  username: string;
  content?: string;
  timestamp: number;
}

// Create typed channels
export const RoomChannels = createChannels<RoomMessage>();

Subscribing to Channels

The subscribe() method returns an async iterable that yields messages as they arrive. The subscription automatically cleans up when the loop exits:

subscription-example.tsTypeScript
// Subscribe returns AsyncIterable + Disposable
const subscription = channels.subscribe('room-123');

// Stream messages with for-await
for await (const msg of subscription) {
  console.log(msg.type, msg.username);

  // Break to unsubscribe
  if (msg.type === 'leave') break;
}
// Subscription automatically cleaned up

// Or manually unsubscribe
subscription.unsubscribe();

Subscriptions implement Disposable, so you can use the using keyword for automatic cleanup:

using-example.tsTypeScript
async function handleConnection(roomId: string) {
  // Subscription auto-disposes when function exits
  using subscription = channels.subscribe(roomId);

  for await (const msg of subscription) {
    client.send(msg);
  }
}

Publishing Messages

Use publish() to broadcast messages to all subscribers of a channel:

publish-example.tsTypeScript
// Broadcast to all subscribers of 'room-123'
channels.publish('room-123', {
  type: 'message',
  username: 'alice',
  content: 'Hello everyone!',
  timestamp: Date.now(),
});

// Only subscribers receive the message
// No subscribers? Message is dropped (fire-and-forget)

Channel API

The channels instance provides these methods:

  • subscribe(key) - Subscribe to a channel, returns async iterable
  • publish(key, msg) - Broadcast to all local subscribers
  • deliverRemote(key, msg) - Deliver message from remote node (for clusters)
  • hasSubscribers(key) - Check if channel has any subscribers
  • getActiveChannels() - List all channels with subscribers

Cluster Integration

For multi-node deployments, use hooks to synchronize channels across the cluster. Hooks are called when subscription state changes:

cluster-channels.tsTypeScript
import { createChannels } from '@justscale/channel';
import { ChatEvents } from './events';

export const RoomChannels = createChannels<RoomMessage>().withHooks({
  // Called when first subscriber joins on this node
  onFirstSubscriber: (roomId) => {
    console.log(`First subscriber to ${roomId}`);
    // Register interest at cluster level
  },

  // Called when last subscriber leaves on this node
  onLastUnsubscribe: (roomId) => {
    console.log(`No more subscribers to ${roomId}`);
    // Unregister from cluster
  },

  // Called when publishing locally
  onPublish: (roomId, msg) => {
    // Broadcast to other cluster nodes via event bus
    ChatEvents.emit(`room.${msg.type}`, { roomId, ...msg });
  },
});

Handling Remote Messages

Use deliverRemote() to deliver messages from other cluster nodes. This only delivers to local subscribers without re-triggering the onPublish hook:

event-handler.tsTypeScript
import { createController } from '@justscale/core';
import { ChatEvents } from './events';
import { RoomChannels } from './room-channels';

export const ChatEventController = createController('/', {
  inject: { channels: RoomChannels },
  routes: (services) => ({
    // Handle events from other cluster nodes
    onRoomEvent: ChatEvents.on('room.*').handle(({ payload }) => {
      const { roomId, ...message } = payload;
      // Deliver to local subscribers only
      services.channels.deliverRemote(roomId, message);
    }),
  }),
});

With WebSocket

Channels integrate naturally with WebSocket for real-time streaming:

websocket-channels.tsTypeScript
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { ChatService } from './chat-service';

export const ChatController = createController('/chat', {
  inject: { chat: ChatService },
  routes: (services) => ({
    room: Ws('/room/:roomId').handle(async ({ messages, send, params }) => {
      // Subscribe to room messages
      const subscription = services.chat.subscribe(params.roomId);

      // Forward channel messages to WebSocket client
      // This runs concurrently with message handling
      const streamMessages = async () => {
        for await (const msg of subscription) {
          send(msg);
        }
      };

      // Handle incoming WebSocket messages
      const handleMessages = async () => {
        for await (const msg of messages) {
          services.chat.broadcast(params.roomId, {
            type: 'message',
            username: msg.username,
            content: msg.content,
            timestamp: Date.now(),
          });
        }
      };

      // Run both concurrently
      await Promise.race([streamMessages(), handleMessages()]);

      // Cleanup subscription
      subscription.unsubscribe();
    }),
  }),
});
💡

Tip

See the WebSocket Rooms & Broadcasting guide for more patterns on combining channels with WebSocket.

Type Safety

Channels are fully typed. The message type flows through to subscriptions:

typed-channels.tsTypeScript
interface ChatMessage {
  type: 'text' | 'image' | 'system';
  sender: string;
  content: string;
}

const ChatChannels = createChannels<ChatMessage>();

// Subscription is typed
const sub = channels.subscribe('room-1');
for await (const msg of sub) {
  // msg is ChatMessage
  if (msg.type === 'text') {
    console.log(msg.sender, msg.content);
  }
}

// Publish requires correct type
channels.publish('room-1', {
  type: 'text',
  sender: 'alice',
  content: 'Hello!',
});