Server-Sent Events
Real-time streaming with SSE routes
The @justscale/sse package provides an SSE() route factory for streaming events to HTTP clients. SSE routes use the same fluent builder API as HTTP routes — .use(),.guard(), .types() — but the handler is an async generator that yields events.
Installation
npm install @justscale/sseThen import the package in your app entry point to register the SSE request handler:
import '@justscale/sse';Basic SSE Route
Create an SSE endpoint with the SSE() factory. The handler is an async generator that yields SSEEvent objects:
import { createController } from '@justscale/core';
import { SSE } from '@justscale/sse';
import { NotificationService } from './notification-service';
export const NotificationController = createController('/notifications', {
inject: { notifications: NotificationService },
routes: ({ notifications }) => ({
stream: SSE('/stream')
.handle(async function* () {
yield { event: 'connected', data: { status: 'ok' } };
for await (const notification of notifications.subscribe()) {
yield { event: 'notification', data: notification };
}
}),
}),
});SSE Event Format
Each yielded object maps to SSE fields:
// All fields except 'data' are optional
yield {
event: 'update', // event: update
id: '42', // id: 42
data: { count: 1 }, // data: {"count":1}
retry: 5000, // retry: 5000
};
// Minimal — just data
yield { data: 'hello' }; // data: helloPath Parameters
SSE routes infer path parameters from the path string, just like HTTP routes:
import { SSE } from '@justscale/sse';
// params.roomId is typed as 'string' — inferred from the path
SSE('/:roomId/events')
.handle(async function* ({ params }) {
yield { data: { room: params.roomId } };
// ...stream room events
});Typed Parameters with .types()
Use .types() to transform path parameters from strings into model references. This is the same API as HTTP routes:
import { createController } from '@justscale/core';
import { ModelRepository } from '@justscale/core/models';
import { SSE } from '@justscale/sse';
import { Ticket } from './domain';
export const TicketController = createController('/tickets', {
inject: { tickets: ModelRepository.of(Ticket) },
routes: ({ tickets }) => ({
events: SSE('/:ticket/events')
.types({ Ticket })
.handle(async function* ({ params }) {
// params.ticket is Reference<Ticket> — await to resolve
const ticket = await params.ticket;
if (!ticket) {
yield { event: 'error', data: { message: 'Not found' } };
return;
}
yield { event: 'connected', data: { subject: ticket.subject } };
for await (const event of ticket.events) {
yield { event: event.type, data: event.data };
}
}),
}),
});Info
.types() method uses the same matching rules as HTTP routes:types: { Ticket } matches :ticket, :Ticket, and :ticketRef.Middleware and Guards
SSE routes support the same .use() and .guard() chain as HTTP routes. Middleware context accumulates and is available in the generator:
import { SSE } from '@justscale/sse';
import { auth } from '@justscale/auth';
SSE('/dashboard/events')
.use(auth)
.guard(({ user }) => user.role === 'admin')
.handle(async function* ({ user }) {
yield { event: 'welcome', data: { name: user.name } };
// Stream admin dashboard updates...
});Client Disconnect
When the client disconnects, the generator is automatically cleaned up. Use try/finally to release resources:
SSE('/events')
.handle(async function* ({ aborted }) {
const subscription = eventBus.subscribe();
try {
for await (const event of subscription) {
yield { event: event.type, data: event.payload };
}
} finally {
// Runs when client disconnects
subscription.unsubscribe();
}
});The aborted promise resolves when the client disconnects, useful for cancelling other async operations.
Combining with HTTP Routes
SSE routes live alongside regular HTTP routes in the same controller:
import { createController } from '@justscale/core';
import { Get, Post } from '@justscale/http';
import { SSE } from '@justscale/sse';
import { Order, OrderService } from './domain';
export const OrderController = createController('/orders', {
inject: { orders: OrderService },
routes: ({ orders }) => ({
// Regular HTTP routes
list: Get('/').handle(async ({ res }) => {
res.json(await orders.list());
}),
// SSE route for real-time updates
events: SSE('/:order/events')
.types({ Order })
.handle(async function* ({ params }) {
const order = await params.order;
if (!order) return;
for await (const update of order.events) {
yield { event: update.type, data: update.data };
}
}),
}),
});