Skip to content

Datastar Integration

Server-Sent Events and reactive signal streaming with Datastar

The @justscale/datastar package provides seamless integration with Datastar, enabling SSE (Server-Sent Events) streaming and reactive signal management for real-time updates in your applications.

Installation

Bash
npm install @justscale/datastar

Basic Setup

Import the plugin to register the Watch route factory:

src/controllers/app.tsTypeScript
import { createController } from '@justscale/core';
import { Get } from '@justscale/http';
import { Watch } from '@justscale/datastar';

export const AppController = createController('/api', {
  routes: () => ({
    // Regular HTTP route
    items: Get('/items').handle(({ res }) => {
      res.json({ items: ['item1', 'item2'] });
    }),

    // SSE streaming route with Datastar
    updates: Watch('/items/updates', async function* () {
      // Stream updates to the client
      for (let i = 0; i < 10; i++) {
        await new Promise(resolve => setTimeout(resolve, 1000));
        yield { count: i };
      }
    }),
  }),
});

What is Datastar?

Datastar is a hypermedia-oriented framework that uses Server-Sent Events (SSE) to create reactive, real-time web applications. It allows you to:

  • Stream HTML fragments and state updates to the client
  • Build reactive UIs without heavy JavaScript frameworks
  • Maintain server-side state and push updates in real-time
  • Handle long-lived connections for live data feeds

The Watch Route Factory

The Watch factory creates SSE endpoints that continuously stream data to connected clients:

notifications-controller.tsTypeScript
import { Watch } from '@justscale/datastar';

const NotificationsController = createController('/notifications', {
  routes: () => ({
    // Generator-based streaming
    stream: Watch('/', async function* () {
      while (true) {
        const notification = await getNextNotification();
        yield { notification };
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    }),
  }),
});

Dirty-Tracking with SignalRepository

Inside a Watch handler, createSignalRepositorywraps the route's stream with a validated, dirty-aware model. Mutate fields freely — calling save() only emits the signals that actually changed.

src/items-controller.tsTypeScript
import { createController } from '@justscale/core';
import { Watch, createSignalRepository } from '@justscale/datastar';
import { z } from 'zod';

const ItemsSchema = z.object({
  items: z.array(z.string()).default([]),
  count: z.number().default(0),
});

async function waitForNextItem(): Promise<string> {
  // Stand-in for whatever produces the next item (a queue, a channel, etc).
  return new Promise((r) => setTimeout(() => r(`item-${Date.now()}`), 100));
}

export const ItemsController = createController('/items', {
  inject: {},
  routes: () => ({
    // Each connection gets its own repository bound to that stream.
    watch: Watch('/watch', async function* ({ stream, signals }) {
      const repo = createSignalRepository(ItemsSchema, stream);
      const model = repo.create(signals);

      while (true) {
        const next = await waitForNextItem();
        model.items = [...model.items, next];
        model.count = model.items.length;
        repo.save(model);      // emits only items + count — nothing else changed
        yield {};               // keep the generator running; repo handled the emit
      }
    }),
  }),
});

HTML Streaming

Stream HTML fragments for hypermedia-driven updates:

dashboard-controller.tsTypeScript
import { html } from '@justscale/datastar';
import { Watch } from '@justscale/datastar';

export const DashboardController = createController('/dashboard', {
  routes: () => ({
    metrics: Watch('/metrics', async function* () {
      while (true) {
        const metrics = await fetchMetrics();

        // Stream HTML fragment
        yield html`
          <div id="metrics">
            <h2>CPU: ${metrics.cpu}%</h2>
            <h2>Memory: ${metrics.memory}%</h2>
          </div>
        `;

        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }),
  }),
});
ℹ️

Info

The html template tag escapes variables by default for security. Use rawHtml if you need to render unescaped HTML (use carefully).

Stream Context

Watch routes receive a special context with streaming utilities:

feed.tsTypeScript
Watch('/feed', async function* ({ stream, req, app }) {
  // stream: SSE writing utilities
  // req: HTTP request object
  // app: Application instance
  // Plus any injected dependencies

  for await (const event of getEvents()) {
    yield { event };
  }
})

Integration with Services

Inject services into Watch routes just like regular routes:

Files
event-bus-service.tsTypeScript
import { defineService } from '@justscale/core';

class EventBusService extends defineService({
  inject: {},
  factory: () => {
    const subscribers = new Set<(event: any) => void>();

    return {
      subscribe: (callback: (event: any) => void) => {
        subscribers.add(callback);
        return () => subscribers.delete(callback);
      },
      publish: (event: any) => {
        subscribers.forEach(sub => sub(event));
      },
    };
  },
}) {}

Client-Side Usage

On the client side, use Datastar's attributes to connect to your Watch endpoints:

HTML
<!DOCTYPE html>
<html>
<head>
  <script type="module" src="https://cdn.jsdelivr.net/npm/@sudodevnull/datastar"></script>
</head>
<body>
  <!-- Subscribe to SSE updates -->
  <div data-on-load="@sse('/api/items/watch')">
    <div data-text="$items.length">0</div>
    <ul>
      <template data-for="item in $items">
        <li data-text="$item"></li>
      </template>
    </ul>
  </div>
</body>
</html>

SignalRepository API

Create a repository for a stream

Takes a Zod schema and the Datastar stream from the Watch context. The returned repository validates inputs and tracks mutations.

create.tsTypeScript
const repo = createSignalRepository(
  z.object({ count: z.number().default(0) }),
  stream,
);

Create a model from current signals

hydrate.tsTypeScript
// Hydrate from the signals the client sent (or undefined for schema defaults).
const model = repo.create(signals);
model.count;      // typed as number

Emit only what changed

save.tsTypeScript
model.count = model.count + 1;
const changed = repo.save(model);   // true if anything was dirty
// → mergeSignals({ count: 2 }) — other fields are untouched on the client.

Force-emit every field

save-all.tsTypeScript
repo.saveAll(model);   // sends every field, regardless of dirty state

Use Cases

  • Live dashboards - Stream real-time metrics and stats
  • Notifications - Push notifications to connected clients
  • Chat applications - Real-time message delivery
  • Live updates - Broadcast database changes to users
  • Progress tracking - Stream long-running job progress
  • Collaborative editing - Sync state across multiple clients

Best Practices

  • Handle cleanup - Use try/finally to clean up subscriptions
  • Rate limit updates - Don't stream too frequently
  • Use heartbeats - Send periodic keep-alive messages
  • Handle reconnection - Client should retry on disconnect
  • Validate data - Sanitize HTML content before streaming