Datastar Integration
Server-Sent Events and reactive signal streaming with Datastar
The @justscale/datastar package provides seamless integration with Datastar, enabling SSE (Server-Sent Events) streaming and reactive signal management for real-time updates in your applications.
Installation
npm install @justscale/datastarBasic Setup
Import the plugin to register the Watch route factory:
import { createController } from '@justscale/core';
import { Get } from '@justscale/http';
import { Watch } from '@justscale/datastar';
export const AppController = createController('/api', {
routes: () => ({
// Regular HTTP route
items: Get('/items').handle(({ res }) => {
res.json({ items: ['item1', 'item2'] });
}),
// SSE streaming route with Datastar
updates: Watch('/items/updates', async function* () {
// Stream updates to the client
for (let i = 0; i < 10; i++) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield { count: i };
}
}),
}),
});What is Datastar?
Datastar is a hypermedia-oriented framework that uses Server-Sent Events (SSE) to create reactive, real-time web applications. It allows you to:
- Stream HTML fragments and state updates to the client
- Build reactive UIs without heavy JavaScript frameworks
- Maintain server-side state and push updates in real-time
- Handle long-lived connections for live data feeds
The Watch Route Factory
The Watch factory creates SSE endpoints that continuously stream data to connected clients:
import { Watch } from '@justscale/datastar';
const NotificationsController = createController('/notifications', {
routes: () => ({
// Generator-based streaming
stream: Watch('/', async function* () {
while (true) {
const notification = await getNextNotification();
yield { notification };
await new Promise(resolve => setTimeout(resolve, 1000));
}
}),
}),
});Dirty-Tracking with SignalRepository
Inside a Watch handler, createSignalRepositorywraps the route's stream with a validated, dirty-aware model. Mutate fields freely — calling save() only emits the signals that actually changed.
import { createController } from '@justscale/core';
import { Watch, createSignalRepository } from '@justscale/datastar';
import { z } from 'zod';
const ItemsSchema = z.object({
items: z.array(z.string()).default([]),
count: z.number().default(0),
});
async function waitForNextItem(): Promise<string> {
// Stand-in for whatever produces the next item (a queue, a channel, etc).
return new Promise((r) => setTimeout(() => r(`item-${Date.now()}`), 100));
}
export const ItemsController = createController('/items', {
inject: {},
routes: () => ({
// Each connection gets its own repository bound to that stream.
watch: Watch('/watch', async function* ({ stream, signals }) {
const repo = createSignalRepository(ItemsSchema, stream);
const model = repo.create(signals);
while (true) {
const next = await waitForNextItem();
model.items = [...model.items, next];
model.count = model.items.length;
repo.save(model); // emits only items + count — nothing else changed
yield {}; // keep the generator running; repo handled the emit
}
}),
}),
});HTML Streaming
Stream HTML fragments for hypermedia-driven updates:
import { html } from '@justscale/datastar';
import { Watch } from '@justscale/datastar';
export const DashboardController = createController('/dashboard', {
routes: () => ({
metrics: Watch('/metrics', async function* () {
while (true) {
const metrics = await fetchMetrics();
// Stream HTML fragment
yield html`
<div id="metrics">
<h2>CPU: ${metrics.cpu}%</h2>
<h2>Memory: ${metrics.memory}%</h2>
</div>
`;
await new Promise(resolve => setTimeout(resolve, 5000));
}
}),
}),
});Info
html template tag escapes variables by default for security. Use rawHtml if you need to render unescaped HTML (use carefully).Stream Context
Watch routes receive a special context with streaming utilities:
Watch('/feed', async function* ({ stream, req, app }) {
// stream: SSE writing utilities
// req: HTTP request object
// app: Application instance
// Plus any injected dependencies
for await (const event of getEvents()) {
yield { event };
}
})Integration with Services
Inject services into Watch routes just like regular routes:
import { defineService } from '@justscale/core';
class EventBusService extends defineService({
inject: {},
factory: () => {
const subscribers = new Set<(event: any) => void>();
return {
subscribe: (callback: (event: any) => void) => {
subscribers.add(callback);
return () => subscribers.delete(callback);
},
publish: (event: any) => {
subscribers.forEach(sub => sub(event));
},
};
},
}) {}Client-Side Usage
On the client side, use Datastar's attributes to connect to your Watch endpoints:
<!DOCTYPE html>
<html>
<head>
<script type="module" src="https://cdn.jsdelivr.net/npm/@sudodevnull/datastar"></script>
</head>
<body>
<!-- Subscribe to SSE updates -->
<div data-on-load="@sse('/api/items/watch')">
<div data-text="$items.length">0</div>
<ul>
<template data-for="item in $items">
<li data-text="$item"></li>
</template>
</ul>
</div>
</body>
</html>SignalRepository API
Create a repository for a stream
Takes a Zod schema and the Datastar stream from the Watch context. The returned repository validates inputs and tracks mutations.
const repo = createSignalRepository(
z.object({ count: z.number().default(0) }),
stream,
);Create a model from current signals
// Hydrate from the signals the client sent (or undefined for schema defaults).
const model = repo.create(signals);
model.count; // typed as numberEmit only what changed
model.count = model.count + 1;
const changed = repo.save(model); // true if anything was dirty
// → mergeSignals({ count: 2 }) — other fields are untouched on the client.Force-emit every field
repo.saveAll(model); // sends every field, regardless of dirty stateUse Cases
- Live dashboards - Stream real-time metrics and stats
- Notifications - Push notifications to connected clients
- Chat applications - Real-time message delivery
- Live updates - Broadcast database changes to users
- Progress tracking - Stream long-running job progress
- Collaborative editing - Sync state across multiple clients
Best Practices
- Handle cleanup - Use try/finally to clean up subscriptions
- Rate limit updates - Don't stream too frequently
- Use heartbeats - Send periodic keep-alive messages
- Handle reconnection - Client should retry on disconnect
- Validate data - Sanitize HTML content before streaming