<!-- Markdown mirror of https://justscale.sh/docs/fundamentals/channels -->

# Channels

Typed pub/sub messaging across services and nodes

Channels provide distributed pub/sub messaging as a first-class primitive. A channel is cluster-aware by default — subscribers receive messages whether they live in the same process or on a different node. This makes channels the foundation for WebSocket broadcasting, event streaming, and any cross-service communication.

## Installation

Bash

```bash
npm install @justscale/core
```

## Basic Usage

Channels key off references, not strings. Define a model for the thing you're broadcasting to (here, `Room`), then create a channels service with `createChannels()` and inject it into your services:

Files

room.model.tsroom-channels.tschat-service.ts

room.model.tsroom-channels.tschat-service.ts

room.model.tsTypeScript

```typescript
import { defineModel, field } from '@justscale/core/models';

// The domain entity channel subscribers and publishers refer to.
export class Room extends defineModel({
  name: 'Room',
  fields: {
    name: field.string().max(100),
  },
}) {}
```

## Subscribing to Channels

The `subscribe()` method returns an async iterable that yields messages as they arrive. The subscription automatically cleans up when the loop exits:

subscription-example.tsTypeScript

```typescript
import { Room } from './room.model';

// Subscribe returns AsyncIterable + Disposable
const subscription = channels.subscribe(Room.ref`123`);

// Stream messages with for-await
for await (const msg of subscription) {
  console.log(msg.type, msg.username);

  // Break to unsubscribe
  if (msg.type === 'leave') break;
}
// Subscription automatically cleaned up

// Or manually unsubscribe
subscription.unsubscribe();
```

Subscriptions implement `Disposable`, so you can use the `using` keyword for automatic cleanup:

using-example.tsTypeScript

```typescript
import type { Ref } from '@justscale/core/models';
import { Room } from './room.model';

async function handleConnection(room: Ref<Room>) {
  // Subscription auto-disposes when function exits
  using subscription = channels.subscribe(room);

  for await (const msg of subscription) {
    client.send(msg);
  }
}
```

## Publishing Messages

Use `publish()` to broadcast messages to all subscribers of a channel:

publish-example.tsTypeScript

```typescript
import { Room } from './room.model';

// Broadcast to all subscribers of Room(123)
channels.publish(Room.ref`123`, {
  type: 'message',
  username: 'alice',
  content: 'Hello everyone!',
  timestamp: Date.now(),
});

// Only subscribers receive the message
// No subscribers? Message is dropped (fire-and-forget)
```

## Channel API

The channels instance provides these methods:

- subscribe(key) - Subscribe to a channel, returns async iterable
- publish(key, msg) - Broadcast to all local subscribers
- deliverRemote(key, msg) - Deliver message from remote node (for clusters)
- hasSubscribers(key) - Check if channel has any subscribers
- getActiveChannels() - List all channels with subscribers

## Cluster Integration

For multi-node deployments, use hooks to synchronize channels across the cluster. Hooks are called when subscription state changes:

cluster-channels.tsTypeScript

```typescript
import { createChannels } from '@justscale/core';
import { ChatEvents } from './events';
import { Room } from './room.model';

export const RoomChannels = createChannels<RoomMessage>().withHooks({
  // Called when first subscriber joins on this node.
  // Hooks sit at the infrastructure boundary — rebuild a typed ref
  // from the channel key before doing any domain work.
  onFirstSubscriber: (key) => {
    const room = Room.ref`${key}`;
    console.log('First subscriber to', room);
    // Register interest in `room` at cluster level
  },

  // Called when last subscriber leaves on this node
  onLastUnsubscribe: (key) => {
    const room = Room.ref`${key}`;
    console.log('No more subscribers to', room);
    // Unregister `room` from cluster
  },

  // Called when publishing locally
  onPublish: (key, msg) => {
    const room = Room.ref`${key}`;
    // Broadcast to other cluster nodes via event bus
    ChatEvents.emit(`room.${msg.type}`, { room, ...msg });
  },
});
```

### Handling Remote Messages

Use `deliverRemote()` to deliver messages from other cluster nodes. This only delivers to local subscribers without re-triggering the `onPublish` hook:

event-handler.tsTypeScript

```typescript
import { createController } from '@justscale/core';
import { ChatEvents } from './events';
import { RoomChannels } from './room-channels';

export const ChatEventController = createController('/', {
  inject: { channels: RoomChannels },
  routes: (services) => ({
    // Handle events from other cluster nodes
    onRoomEvent: ChatEvents.on('room.*').handle(({ payload }) => {
      const { room, ...message } = payload;
      // Deliver to local subscribers only. `room` is a Ref<Room>
      // carried through the event bus — channels accept refs directly.
      services.channels.deliverRemote(room, message);
    }),
  }),
});
```

## With WebSocket

Channels integrate naturally with WebSocket for real-time streaming:

websocket-channels.tsTypeScript

```typescript
import { createController } from '@justscale/core';
import { Ws } from '@justscale/websocket';
import { ChatService } from './chat-service';
import { Room } from './room.model';

export const ChatController = createController('/chat', {
  inject: { chat: ChatService },
  routes: (services) => ({
    room: Ws('/room/:roomId').handle(async ({ messages, send, params }) => {
      // Boundary: turn the raw path param into a typed reference.
      const room = Room.ref`${params.roomId}`;

      // Subscribe to room messages
      const subscription = services.chat.subscribe(room);

      // Forward channel messages to WebSocket client
      // This runs concurrently with message handling
      const streamMessages = async () => {
        for await (const msg of subscription) {
          send(msg);
        }
      };

      // Handle incoming WebSocket messages
      const handleMessages = async () => {
        for await (const msg of messages) {
          services.chat.broadcast(room, {
            type: 'message',
            username: msg.username,
            content: msg.content,
            timestamp: Date.now(),
          });
        }
      };

      // Run both concurrently
      await Promise.race([streamMessages(), handleMessages()]);

      // Cleanup subscription
      subscription.unsubscribe();
    }),
  }),
});
```

💡Tip

See the **WebSocket Rooms & Broadcasting** guide for more patterns on combining channels with WebSocket.

## Type Safety

Channels are fully typed. The message type flows through to subscriptions:

typed-channels.tsTypeScript

```typescript
import { Room } from './room.model';

interface ChatMessage {
  type: 'text' | 'image' | 'system';
  sender: string;
  content: string;
}

const ChatChannels = createChannels<ChatMessage>();

// Subscription is typed
const sub = channels.subscribe(Room.ref`1`);
for await (const msg of sub) {
  // msg is ChatMessage
  if (msg.type === 'text') {
    console.log(msg.sender, msg.content);
  }
}

// Publish requires correct type
channels.publish(Room.ref`1`, {
  type: 'text',
  sender: 'alice',
  content: 'Hello!',
});
```

## Next Steps

- WebSocket Overview
- Event Bus
- Cluster
