Skip to content

Channels

Typed pub/sub messaging across services and nodes

Channels provide distributed pub/sub messaging as a first-class primitive. A channel is cluster-aware by default — subscribers receive messages whether they live in the same process or on a different node. This makes channels the foundation for WebSocket broadcasting, event streaming, and any cross-service communication.

Installation

Bash
npm install @justscale/core

Basic Usage

Channels key off references, not strings. Define a model for the thing you're broadcasting to (here, Room), then create a channels service with createChannels<TMessage>() and inject it into your services:

Files
room.model.tsTypeScript
import { defineModel, field } from '@justscale/core/models';

// The domain entity channel subscribers and publishers refer to.
export class Room extends defineModel({
  name: 'Room',
  fields: {
    name: field.string().max(100),
  },
}) {}

Subscribing to Channels

The subscribe() method returns an async iterable that yields messages as they arrive. The subscription automatically cleans up when the loop exits:

subscription-example.tsTypeScript
import { Room } from './room.model';

// Subscribe returns AsyncIterable + Disposable
const subscription = channels.subscribe(Room.ref`123`);

// Stream messages with for-await
for await (const msg of subscription) {
  console.log(msg.type, msg.username);

  // Break to unsubscribe
  if (msg.type === 'leave') break;
}
// Subscription automatically cleaned up

// Or manually unsubscribe
subscription.unsubscribe();

Subscriptions implement Disposable, so you can use the using keyword for automatic cleanup:

using-example.tsTypeScript
import type { Ref } from '@justscale/core/models';
import { Room } from './room.model';

async function handleConnection(room: Ref<Room>) {
  // Subscription auto-disposes when function exits
  using subscription = channels.subscribe(room);

  for await (const msg of subscription) {
    client.send(msg);
  }
}

Publishing Messages

Use publish() to broadcast messages to all subscribers of a channel:

publish-example.tsTypeScript
import { Room } from './room.model';

// Broadcast to all subscribers of Room(123)
channels.publish(Room.ref`123`, {
  type: 'message',
  username: 'alice',
  content: 'Hello everyone!',
  timestamp: Date.now(),
});

// Only subscribers receive the message
// No subscribers? Message is dropped (fire-and-forget)

Channel API

The channels instance provides these methods:

  • subscribe(key) - Subscribe to a channel, returns async iterable
  • publish(key, msg) - Broadcast to all local subscribers
  • deliverRemote(key, msg) - Deliver message from remote node (for clusters)
  • hasSubscribers(key) - Check if channel has any subscribers
  • getActiveChannels() - List all channels with subscribers

Cluster Integration

For multi-node deployments, use hooks to synchronize channels across the cluster. Hooks are called when subscription state changes:

cluster-channels.tsTypeScript
import { createChannels } from '@justscale/core';
import { ChatEvents } from './events';
import { Room } from './room.model';

export const RoomChannels = createChannels<RoomMessage>().withHooks({
  // Called when first subscriber joins on this node.
  // Hooks sit at the infrastructure boundary — rebuild a typed ref
  // from the channel key before doing any domain work.
  onFirstSubscriber: (key) => {
    const room = Room.ref`${key}`;
    console.log('First subscriber to', room);
    // Register interest in `room` at cluster level
  },

  // Called when last subscriber leaves on this node
  onLastUnsubscribe: (key) => {
    const room = Room.ref`${key}`;
    console.log('No more subscribers to', room);
    // Unregister `room` from cluster
  },

  // Called when publishing locally
  onPublish: (key, msg) => {
    const room = Room.ref`${key}`;
    // Broadcast to other cluster nodes via event bus
    ChatEvents.emit(`room.${msg.type}`, { room, ...msg });
  },
});

Handling Remote Messages

Use deliverRemote() to deliver messages from other cluster nodes. This only delivers to local subscribers without re-triggering the onPublish hook:

event-handler.tsTypeScript
import { createController } from '@justscale/core';
import { ChatEvents } from './events';
import { RoomChannels } from './room-channels';

export const ChatEventController = createController('/', {
  inject: { channels: RoomChannels },
  routes: (services) => ({
    // Handle events from other cluster nodes
    onRoomEvent: ChatEvents.on('room.*').handle(({ payload }) => {
      const { room, ...message } = payload;
      // Deliver to local subscribers only. `room` is a Ref<Room>
      // carried through the event bus — channels accept refs directly.
      services.channels.deliverRemote(room, message);
    }),
  }),
});

With WebSocket

Channels integrate naturally with WebSocket for real-time streaming:

websocket-channels.tsTypeScript
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { ChatService } from './chat-service';
import { Room } from './room.model';

export const ChatController = createController('/chat', {
  inject: { chat: ChatService },
  routes: (services) => ({
    room: Ws('/room/:roomId').handle(async ({ messages, send, params }) => {
      // Boundary: turn the raw path param into a typed reference.
      const room = Room.ref`${params.roomId}`;

      // Subscribe to room messages
      const subscription = services.chat.subscribe(room);

      // Forward channel messages to WebSocket client
      // This runs concurrently with message handling
      const streamMessages = async () => {
        for await (const msg of subscription) {
          send(msg);
        }
      };

      // Handle incoming WebSocket messages
      const handleMessages = async () => {
        for await (const msg of messages) {
          services.chat.broadcast(room, {
            type: 'message',
            username: msg.username,
            content: msg.content,
            timestamp: Date.now(),
          });
        }
      };

      // Run both concurrently
      await Promise.race([streamMessages(), handleMessages()]);

      // Cleanup subscription
      subscription.unsubscribe();
    }),
  }),
});
💡

Tip

See the WebSocket Rooms & Broadcasting guide for more patterns on combining channels with WebSocket.

Type Safety

Channels are fully typed. The message type flows through to subscriptions:

typed-channels.tsTypeScript
import { Room } from './room.model';

interface ChatMessage {
  type: 'text' | 'image' | 'system';
  sender: string;
  content: string;
}

const ChatChannels = createChannels<ChatMessage>();

// Subscription is typed
const sub = channels.subscribe(Room.ref`1`);
for await (const msg of sub) {
  // msg is ChatMessage
  if (msg.type === 'text') {
    console.log(msg.sender, msg.content);
  }
}

// Publish requires correct type
channels.publish(Room.ref`1`, {
  type: 'text',
  sender: 'alice',
  content: 'Hello!',
});