<!-- Markdown mirror of https://justscale.sh/docs/features/datastar -->

# Datastar Integration

Server-Sent Events and reactive signal streaming with Datastar

The `@justscale/datastar` package provides seamless integration with [Datastar](https://data-star.dev/), enabling SSE (Server-Sent Events) streaming and reactive signal management for real-time updates in your applications.

## Installation

Bash

```bash
npm install @justscale/datastar
```

## Basic Setup

Import the plugin to register the `Watch` route factory:

src/controllers/app.tsTypeScript

```typescript
import { createController } from '@justscale/core';
import { Get } from '@justscale/http';
import { Watch } from '@justscale/datastar';

export const AppController = createController('/api', {
  routes: () => ({
    // Regular HTTP route
    items: Get('/items').handle(({ res }) => {
      res.json({ items: ['item1', 'item2'] });
    }),

    // SSE streaming route with Datastar
    updates: Watch('/items/updates', async function* () {
      // Stream updates to the client
      for (let i = 0; i < 10; i++) {
        await new Promise(resolve => setTimeout(resolve, 1000));
        yield { count: i };
      }
    }),
  }),
});
```

## What is Datastar?

Datastar is a hypermedia-oriented framework that uses Server-Sent Events (SSE) to create reactive, real-time web applications. It allows you to:

- Stream HTML fragments and state updates to the client
- Build reactive UIs without heavy JavaScript frameworks
- Maintain server-side state and push updates in real-time
- Handle long-lived connections for live data feeds

## The Watch Route Factory

The `Watch` factory creates SSE endpoints that continuously stream data to connected clients:

notifications-controller.tsTypeScript

```typescript
import { Watch } from '@justscale/datastar';

const NotificationsController = createController('/notifications', {
  routes: () => ({
    // Generator-based streaming
    stream: Watch('/', async function* () {
      while (true) {
        const notification = await getNextNotification();
        yield { notification };
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    }),
  }),
});
```

## Dirty-Tracking with SignalRepository

Inside a `Watch` handler, `createSignalRepository`wraps the route's stream with a validated, dirty-aware model. Mutate fields freely — calling `save()` only emits the signals that actually changed.

src/items-controller.tsTypeScript

```typescript
import { createController } from '@justscale/core';
import { Watch, createSignalRepository } from '@justscale/datastar';
import { z } from 'zod';

const ItemsSchema = z.object({
  items: z.array(z.string()).default([]),
  count: z.number().default(0),
});

async function waitForNextItem(): Promise<string> {
  // Stand-in for whatever produces the next item (a queue, a channel, etc).
  return new Promise((r) => setTimeout(() => r(`item-${Date.now()}`), 100));
}

export const ItemsController = createController('/items', {
  inject: {},
  routes: () => ({
    // Each connection gets its own repository bound to that stream.
    watch: Watch('/watch', async function* ({ stream, signals }) {
      const repo = createSignalRepository(ItemsSchema, stream);
      const model = repo.create(signals);

      while (true) {
        const next = await waitForNextItem();
        model.items = [...model.items, next];
        model.count = model.items.length;
        repo.save(model);      // emits only items + count — nothing else changed
        yield {};               // keep the generator running; repo handled the emit
      }
    }),
  }),
});
```

## HTML Streaming

Stream HTML fragments for hypermedia-driven updates:

dashboard-controller.tsTypeScript

```typescript
import { html } from '@justscale/datastar';
import { Watch } from '@justscale/datastar';

export const DashboardController = createController('/dashboard', {
  routes: () => ({
    metrics: Watch('/metrics', async function* () {
      while (true) {
        const metrics = await fetchMetrics();

        // Stream HTML fragment
        yield html`
          <div id="metrics">
            <h2>CPU: ${metrics.cpu}%</h2>
            <h2>Memory: ${metrics.memory}%</h2>
          </div>
        `;

        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }),
  }),
});
```

ℹ️Info

The `html` template tag escapes variables by default for security. Use `rawHtml` if you need to render unescaped HTML (use carefully).

## Stream Context

Watch routes receive a special context with streaming utilities:

feed.tsTypeScript

```typescript
Watch('/feed', async function* ({ stream, req, app }) {
  // stream: SSE writing utilities
  // req: HTTP request object
  // app: Application instance
  // Plus any injected dependencies

  for await (const event of getEvents()) {
    yield { event };
  }
})
```

## Integration with Services

Inject services into Watch routes just like regular routes:

Files

event-bus-service.tsevents-controller.ts

event-bus-service.tsevents-controller.ts

event-bus-service.tsTypeScript

```typescript
import { defineService } from '@justscale/core';

class EventBusService extends defineService({
  inject: {},
  factory: () => {
    const subscribers = new Set<(event: any) => void>();

    return {
      subscribe: (callback: (event: any) => void) => {
        subscribers.add(callback);
        return () => subscribers.delete(callback);
      },
      publish: (event: any) => {
        subscribers.forEach(sub => sub(event));
      },
    };
  },
}) {}
```

## Client-Side Usage

On the client side, use Datastar's attributes to connect to your Watch endpoints:

HTML

```html
<!DOCTYPE html>
<html>
<head>
  <script type="module" src="https://cdn.jsdelivr.net/npm/@sudodevnull/datastar"></script>
</head>
<body>
  <!-- Subscribe to SSE updates -->
  <div data-on-load="@sse('/api/items/watch')">
    <div data-text="$items.length">0</div>
    <ul>
      <template data-for="item in $items">
        <li data-text="$item"></li>
      </template>
    </ul>
  </div>
</body>
</html>
```

## SignalRepository API

### Create a repository for a stream

Takes a Zod schema and the Datastar stream from the Watch context. The returned repository validates inputs and tracks mutations.

create.tsTypeScript

```typescript
const repo = createSignalRepository(
  z.object({ count: z.number().default(0) }),
  stream,
);
```

### Create a model from current signals

hydrate.tsTypeScript

```typescript
// Hydrate from the signals the client sent (or undefined for schema defaults).
const model = repo.create(signals);
model.count;      // typed as number
```

### Emit only what changed

save.tsTypeScript

```typescript
model.count = model.count + 1;
const changed = repo.save(model);   // true if anything was dirty
// → mergeSignals({ count: 2 }) — other fields are untouched on the client.
```

### Force-emit every field

save-all.tsTypeScript

```typescript
repo.saveAll(model);   // sends every field, regardless of dirty state
```

## Use Cases

- Live dashboards - Stream real-time metrics and stats
- Notifications - Push notifications to connected clients
- Chat applications - Real-time message delivery
- Live updates - Broadcast database changes to users
- Progress tracking - Stream long-running job progress
- Collaborative editing - Sync state across multiple clients

## Best Practices

- Handle cleanup - Use try/finally to clean up subscriptions
- Rate limit updates - Don't stream too frequently
- Use heartbeats - Send periodic keep-alive messages
- Handle reconnection - Client should retry on disconnect
- Validate data - Sanitize HTML content before streaming

## Next Steps

- Features Overview
- Request Handling
- Testing
