PostgreSQL Repositories
Query methods, transactions, and advanced features
PostgreSQL repositories provide a complete set of query methods for working with your data. All methods are type-safe and integrate seamlessly with JustScale's dependency injection system.
Repository Methods
The PgRepository class provides these core methods:
Finding Entities
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Find all matching a condition
async findActive() {
return users.find({
where: User.fields.status.eq('active'),
orderBy: { createdAt: 'desc' },
limit: 10,
offset: 0,
});
},
// Find by primary key
async getUser(id: string) {
return users.findById(id);
// Returns: Persistent<User> | undefined
},
// Find first match
async findByEmail(email: string) {
return users.findOne(User.fields.email.eq(email));
// Returns: Persistent<User> | undefined
},
}),
});Counting and Existence
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Count matching rows
async countActive() {
return users.count(User.fields.status.eq('active'));
// Returns: number
},
// Check existence
async emailExists(email: string) {
return users.exists(User.fields.email.eq(email));
// Returns: boolean
},
}),
});Inserting Entities
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Insert single entity
async createUser(data: { email: string; name: string }) {
return users.insert(data);
// Returns: Persistent<User> (with id, createdAt, updatedAt, version)
},
// Insert many (bulk insert)
async createMany(data: Array<{ email: string; name: string }>) {
return users.insertMany(data);
// Returns: Persistent<User>[]
},
}),
});Updating Entities
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Update by ID
async updateName(id: string, name: string) {
return users.update(id, { name });
// Returns: Persistent<User>
},
// Update with optimistic locking
async updateWithVersion(id: string, name: string, expectedVersion: number) {
return users.update(id, { name }, expectedVersion);
// Throws if version mismatch (concurrent update detected)
},
// Smart save (insert or update)
async saveUser(user: Partial<Persistent<User>>) {
return users.save(user);
},
}),
});Deleting Entities
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Delete by ID
async deleteUser(id: string) {
return users.delete(id);
// Returns: boolean (true if deleted, false if not found)
},
// Delete matching a condition
async deleteInactive() {
return users.deleteWhere(User.fields.status.eq('inactive'));
// Returns: number (count of deleted rows)
},
}),
});Streaming Results
For large result sets, use streaming to avoid loading everything into memory:
import { User, UserRepository } from './user-model';
const UserService = createService({
inject: { users: UserRepository },
factory: ({ users }) => ({
// Stream entities one at a time
async *processAllUsers() {
for await (const user of users.stream()) {
yield user;
// Process each user without loading all into memory
}
},
// Stream in batches (more efficient for bulk operations)
async processBatches() {
for await (const batch of users.streamBatches({ batchSize: 100 })) {
await processBatch(batch); // Process 100 users at a time
}
},
}),
});PgModel Options
When creating a PgModel, you can configure storage details:
import { defineModel, field } from '@justscale/models';
import { createPgModel } from '@justscale/postgres';
class Product extends defineModel({
name: field.string().max(255),
category: field.ref(() => Category),
author: field.ref(() => User),
tags: field.array(field.string()),
status: field.enum('ProductStatus', ['draft', 'published'] as const),
}) {}
const PgProduct = createPgModel(Product, {
table: 'products',
storageMode: 'columnar', // 'columnar' | 'jsonb'
// Override column types and constraints
overrides: {
name: { type: 'VARCHAR(255)', unique: false },
tags: { type: 'TEXT[]' },
},
// Configure foreign key behavior
relations: {
category: { onDelete: 'SET NULL' },
author: { onDelete: 'CASCADE' },
},
// Define indexes
indexes: [
{ fields: ['category', 'status'], name: 'idx_product_category_status' },
{ fields: ['name'], using: 'gin', name: 'idx_product_name_search' },
],
});Connecting to PostgreSQL
Create a PostgreSQL client service and register it with your cluster:
import { createPostgresClient } from '@justscale/postgres';
import { createCluster } from '@justscale/cluster';
import { UserRepository } from './user-model';
import { UserService } from './user-service';
const PostgresClient = createPostgresClient({
connectionString: process.env.DATABASE_URL,
// Or individual options:
// host: 'localhost',
// port: 5432,
// database: 'myapp',
// username: 'user',
// password: 'pass',
});
const cluster = createCluster({
services: [PostgresClient, UserRepository, UserService],
});
await cluster.serve({ http: 3000 });Transactions
The PostgreSQL client supports automatic transaction management with savepoints for nested transactions:
import { AbstractPostgresClient } from '@justscale/postgres';
import { UserRepository } from './user-model';
const UserService = createService({
inject: {
client: AbstractPostgresClient,
users: UserRepository,
},
factory: ({ client, users }) => ({
async transfer(fromId: string, toId: string, amount: number) {
// All operations in this transaction succeed or fail together
await client.transaction(async () => {
const from = await users.findById(fromId);
const to = await users.findById(toId);
if (!from || !to) throw new Error('User not found');
if (from.balance < amount) throw new Error('Insufficient funds');
await users.update(fromId, { balance: from.balance - amount });
await users.update(toId, { balance: to.balance + amount });
});
},
}),
});Info
The PostgreSQL adapter uses AsyncLocalStorage to automatically apply the current transaction context to all queries. You do not need to pass a transaction object around.
Nested Transactions (Savepoints)
Nested transactions are automatically handled using PostgreSQL savepoints:
import { AbstractPostgresClient } from '@justscale/postgres';
const service = createService({
inject: { client: AbstractPostgresClient, users: UserRepository },
factory: ({ client, users }) => ({
async complexOperation() {
await client.transaction(async () => {
await users.insert({ email: 'a@example.com', name: 'Alice' });
// Nested transaction = savepoint
await client.transaction(async () => {
await users.insert({ email: 'b@example.com', name: 'Bob' });
// If this throws, only Bob's insert is rolled back
});
await users.insert({ email: 'c@example.com', name: 'Carol' });
});
},
}),
});Transaction Isolation Levels
Configure isolation levels for specific transactions:
import { AbstractPostgresClient } from '@justscale/postgres';
const service = createService({
inject: { client: AbstractPostgresClient },
factory: ({ client }) => ({
async criticalOperation() {
await client.transaction(async () => {
// Transaction logic here
}, { isolationLevel: 'serializable' });
// Options: 'read uncommitted', 'read committed', 'repeatable read', 'serializable'
},
}),
});After Commit Hooks
Register callbacks to run after a transaction commits:
import { AbstractPostgresClient } from '@justscale/postgres';
const service = createService({
inject: { client: AbstractPostgresClient, users: UserRepository },
factory: ({ client, users }) => ({
async createUser(email: string, name: string) {
await client.transaction(async () => {
const user = await users.insert({ email, name });
// Send email only if transaction commits
client.afterCommit(async () => {
await sendWelcomeEmail(user.email);
});
});
},
}),
});Identity Map
The PostgreSQL client includes an identity map for entity caching within transactions:
import { AbstractPostgresClient } from '@justscale/postgres';
const service = createService({
inject: { client: AbstractPostgresClient, users: UserRepository },
factory: ({ client, users }) => ({
async example() {
await client.transaction(async () => {
const user1 = await users.findById('123');
const user2 = await users.findById('123');
// user1 === user2 (same object instance from identity map)
// Clear cache if needed
client.clearIdentityMap();
});
},
}),
});Best Practices
- Use transactions for multi-step operations - Ensure data consistency when multiple queries must succeed or fail together
- Leverage streaming for large datasets - Avoid memory issues by processing entities one at a time or in batches
- Configure indexes appropriately - Add indexes for frequently queried fields, but avoid over-indexing
- Use optimistic locking - Pass
expectedVersionto update/delete methods to detect concurrent modifications - Use afterCommit hooks - For side effects like sending emails or publishing events that should only occur on successful commit