PostgreSQL Repositories

Query methods, transactions, and advanced features

PostgreSQL repositories provide a complete set of query methods for working with your data. All methods are type-safe and integrate seamlessly with JustScale's dependency injection system.

Repository Methods

The PgRepository class provides these core methods:

Finding Entities

finding-entities.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Find all matching a condition
    async findActive() {
      return users.find({
        where: User.fields.status.eq('active'),
        orderBy: { createdAt: 'desc' },
        limit: 10,
        offset: 0,
      });
    },

    // Find by primary key
    async getUser(id: string) {
      return users.findById(id);
      // Returns: Persistent<User> | undefined
    },

    // Find first match
    async findByEmail(email: string) {
      return users.findOne(User.fields.email.eq(email));
      // Returns: Persistent<User> | undefined
    },
  }),
});

Counting and Existence

counting.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Count matching rows
    async countActive() {
      return users.count(User.fields.status.eq('active'));
      // Returns: number
    },

    // Check existence
    async emailExists(email: string) {
      return users.exists(User.fields.email.eq(email));
      // Returns: boolean
    },
  }),
});

Inserting Entities

inserting.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Insert single entity
    async createUser(data: { email: string; name: string }) {
      return users.insert(data);
      // Returns: Persistent<User> (with id, createdAt, updatedAt, version)
    },

    // Insert many (bulk insert)
    async createMany(data: Array<{ email: string; name: string }>) {
      return users.insertMany(data);
      // Returns: Persistent<User>[]
    },
  }),
});

Updating Entities

updating.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Update by ID
    async updateName(id: string, name: string) {
      return users.update(id, { name });
      // Returns: Persistent<User>
    },

    // Update with optimistic locking
    async updateWithVersion(id: string, name: string, expectedVersion: number) {
      return users.update(id, { name }, expectedVersion);
      // Throws if version mismatch (concurrent update detected)
    },

    // Smart save (insert or update)
    async saveUser(user: Partial<Persistent<User>>) {
      return users.save(user);
    },
  }),
});

Deleting Entities

deleting.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Delete by ID
    async deleteUser(id: string) {
      return users.delete(id);
      // Returns: boolean (true if deleted, false if not found)
    },

    // Delete matching a condition
    async deleteInactive() {
      return users.deleteWhere(User.fields.status.eq('inactive'));
      // Returns: number (count of deleted rows)
    },
  }),
});

Streaming Results

For large result sets, use streaming to avoid loading everything into memory:

streaming.tsTypeScript
import { User, UserRepository } from './user-model';

const UserService = createService({
  inject: { users: UserRepository },
  factory: ({ users }) => ({
    // Stream entities one at a time
    async *processAllUsers() {
      for await (const user of users.stream()) {
        yield user;
        // Process each user without loading all into memory
      }
    },

    // Stream in batches (more efficient for bulk operations)
    async processBatches() {
      for await (const batch of users.streamBatches({ batchSize: 100 })) {
        await processBatch(batch);  // Process 100 users at a time
      }
    },
  }),
});

PgModel Options

When creating a PgModel, you can configure storage details:

pg-model-options.tsTypeScript
import { defineModel, field } from '@justscale/models';
import { createPgModel } from '@justscale/postgres';

class Product extends defineModel({
  name: field.string().max(255),
  category: field.ref(() => Category),
  author: field.ref(() => User),
  tags: field.array(field.string()),
  status: field.enum('ProductStatus', ['draft', 'published'] as const),
}) {}

const PgProduct = createPgModel(Product, {
  table: 'products',
  storageMode: 'columnar',  // 'columnar' | 'jsonb'

  // Override column types and constraints
  overrides: {
    name: { type: 'VARCHAR(255)', unique: false },
    tags: { type: 'TEXT[]' },
  },

  // Configure foreign key behavior
  relations: {
    category: { onDelete: 'SET NULL' },
    author: { onDelete: 'CASCADE' },
  },

  // Define indexes
  indexes: [
    { fields: ['category', 'status'], name: 'idx_product_category_status' },
    { fields: ['name'], using: 'gin', name: 'idx_product_name_search' },
  ],
});

Connecting to PostgreSQL

Create a PostgreSQL client service and register it with your cluster:

database-setup.tsTypeScript
import { createPostgresClient } from '@justscale/postgres';
import { createCluster } from '@justscale/cluster';
import { UserRepository } from './user-model';
import { UserService } from './user-service';

const PostgresClient = createPostgresClient({
  connectionString: process.env.DATABASE_URL,
  // Or individual options:
  // host: 'localhost',
  // port: 5432,
  // database: 'myapp',
  // username: 'user',
  // password: 'pass',
});

const cluster = createCluster({
  services: [PostgresClient, UserRepository, UserService],
});

await cluster.serve({ http: 3000 });

Transactions

The PostgreSQL client supports automatic transaction management with savepoints for nested transactions:

transactions.tsTypeScript
import { AbstractPostgresClient } from '@justscale/postgres';
import { UserRepository } from './user-model';

const UserService = createService({
  inject: {
    client: AbstractPostgresClient,
    users: UserRepository,
  },
  factory: ({ client, users }) => ({
    async transfer(fromId: string, toId: string, amount: number) {
      // All operations in this transaction succeed or fail together
      await client.transaction(async () => {
        const from = await users.findById(fromId);
        const to = await users.findById(toId);

        if (!from || !to) throw new Error('User not found');
        if (from.balance < amount) throw new Error('Insufficient funds');

        await users.update(fromId, { balance: from.balance - amount });
        await users.update(toId, { balance: to.balance + amount });
      });
    },
  }),
});
ℹ️

Info

The PostgreSQL adapter uses AsyncLocalStorage to automatically apply the current transaction context to all queries. You do not need to pass a transaction object around.

Nested Transactions (Savepoints)

Nested transactions are automatically handled using PostgreSQL savepoints:

nested-transactions.tsTypeScript
import { AbstractPostgresClient } from '@justscale/postgres';

const service = createService({
  inject: { client: AbstractPostgresClient, users: UserRepository },
  factory: ({ client, users }) => ({
    async complexOperation() {
      await client.transaction(async () => {
        await users.insert({ email: 'a@example.com', name: 'Alice' });

        // Nested transaction = savepoint
        await client.transaction(async () => {
          await users.insert({ email: 'b@example.com', name: 'Bob' });
          // If this throws, only Bob's insert is rolled back
        });

        await users.insert({ email: 'c@example.com', name: 'Carol' });
      });
    },
  }),
});

Transaction Isolation Levels

Configure isolation levels for specific transactions:

isolation-levels.tsTypeScript
import { AbstractPostgresClient } from '@justscale/postgres';

const service = createService({
  inject: { client: AbstractPostgresClient },
  factory: ({ client }) => ({
    async criticalOperation() {
      await client.transaction(async () => {
        // Transaction logic here
      }, { isolationLevel: 'serializable' });
      // Options: 'read uncommitted', 'read committed', 'repeatable read', 'serializable'
    },
  }),
});

After Commit Hooks

Register callbacks to run after a transaction commits:

after-commit.tsTypeScript
import { AbstractPostgresClient } from '@justscale/postgres';

const service = createService({
  inject: { client: AbstractPostgresClient, users: UserRepository },
  factory: ({ client, users }) => ({
    async createUser(email: string, name: string) {
      await client.transaction(async () => {
        const user = await users.insert({ email, name });

        // Send email only if transaction commits
        client.afterCommit(async () => {
          await sendWelcomeEmail(user.email);
        });
      });
    },
  }),
});

Identity Map

The PostgreSQL client includes an identity map for entity caching within transactions:

identity-map.tsTypeScript
import { AbstractPostgresClient } from '@justscale/postgres';

const service = createService({
  inject: { client: AbstractPostgresClient, users: UserRepository },
  factory: ({ client, users }) => ({
    async example() {
      await client.transaction(async () => {
        const user1 = await users.findById('123');
        const user2 = await users.findById('123');
        // user1 === user2 (same object instance from identity map)

        // Clear cache if needed
        client.clearIdentityMap();
      });
    },
  }),
});

Best Practices

  • Use transactions for multi-step operations - Ensure data consistency when multiple queries must succeed or fail together
  • Leverage streaming for large datasets - Avoid memory issues by processing entities one at a time or in batches
  • Configure indexes appropriately - Add indexes for frequently queried fields, but avoid over-indexing
  • Use optimistic locking - Pass expectedVersion to update/delete methods to detect concurrent modifications
  • Use afterCommit hooks - For side effects like sending emails or publishing events that should only occur on successful commit