RPC Controllers

Implementing gRPC services with contract-based controllers

Contract controllers implement gRPC service definitions imported directly from .proto files. The TypeScript compiler plugin generates types at build time, so you get full type safety without codegen.

Importing Contracts from Proto Files

Import the generated contract directly from your .proto file. The compiler plugin handles type generation automatically:

protobuf
// greeter.proto
syntax = "proto3";
package helloworld;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
  rpc SayHelloStream (HelloRequest) returns (stream HelloReply);
  rpc RecordRoute (stream Point) returns (RouteSummary);
  rpc RouteChat (stream RouteNote) returns (stream RouteNote);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
TypeScript
import { GreeterContract } from './greeter.proto'

// The contract provides:
// - GreeterContract: Service definition with method signatures
// - HelloRequest, HelloReply: TypeScript types for messages
// - Encode/decode functions for binary serialization

Creating a Contract Controller

Use createController.implements(Contract) to create a controller that implements a gRPC service:

TypeScript
import { createController } from '@justscale/core'
import { GreeterContract } from './greeter.proto'

const GreeterController = createController
  .implements(GreeterContract)
  .create({
    inject: { db: DatabaseService },

    methods: ({ db }) => ({
      SayHello: async ({ body }) => ({
        message: `Hello, ${body.name}!`,
      }),

      // All methods from the contract must be implemented
      SayHelloStream: async function* ({ body }) {
        for (let i = 0; i < 5; i++) {
          yield { message: `Hello ${body.name} #${i + 1}` }
          await delay(1000)
        }
      },

      RecordRoute: async ({ body }) => {
        // body is AsyncIterable<Point> for client streaming
        let pointCount = 0
        for await (const point of body) {
          pointCount++
        }
        return { pointCount, distance: 0, elapsedTime: 0, featureCount: 0 }
      },

      RouteChat: async function* ({ body }) {
        // body is AsyncIterable<RouteNote> for bidi streaming
        for await (const note of body) {
          yield { location: note.location, message: `Echo: ${note.message}` }
        }
      },
    }),
  })

Streaming Modes

gRPC supports four streaming modes. The method signature determines which mode is used:

  • Unary - Single request, single response. Return a value or Promise.
  • Server streaming - Single request, stream of responses. Return an async generator.
  • Client streaming - Stream of requests, single response. body is an AsyncIterable.
  • Bidirectional - Stream of requests, stream of responses. Combine both patterns.
TypeScript
methods: () => ({
  // Unary: request -> response
  SayHello: async ({ body }) => {
    return { message: `Hello, ${body.name}!` }
  },

  // Server streaming: request -> stream of responses
  ListFeatures: async function* ({ body }) {
    const features = await db.features.findInArea(body.bounds)
    for (const feature of features) {
      yield feature
    }
  },

  // Client streaming: stream of requests -> response
  RecordRoute: async ({ body }) => {
    const points: Point[] = []
    for await (const point of body) {
      points.push(point)
    }
    return calculateRouteSummary(points)
  },

  // Bidirectional: stream of requests -> stream of responses
  RouteChat: async function* ({ body }) {
    for await (const note of body) {
      const nearby = await db.notes.findNear(note.location)
      for (const n of nearby) {
        yield n
      }
    }
  },
})

The RpcContext Object

Every RPC handler receives an RpcContext with the following properties:

TypeScript
interface RpcContext<TBody, TSession = unknown> {
  /** Request body (message for unary/server, AsyncIterable for client/bidi) */
  body: TBody

  /** gRPC metadata (headers) */
  metadata: Map<string, string>

  /** AbortSignal for cancellation */
  signal: AbortSignal

  /** Request deadline (if set by client) */
  deadline?: Date

  /** Session data (populated by auth middleware) */
  session: TSession
}
TypeScript
methods: () => ({
  SayHello: async ({ body, metadata, signal, session }) => {
    // Access request message
    const name = body.name

    // Read gRPC metadata (headers)
    const traceId = metadata.get('x-trace-id')

    // Check for cancellation
    if (signal.aborted) {
      throw new Error('Request cancelled')
    }

    // Access authenticated user (if auth middleware is used)
    const userId = session?.userId

    return { message: `Hello, ${name}!` }
  },
})

Handling Cancellation

Use the signal property to handle client cancellation gracefully:

TypeScript
ListFeatures: async function* ({ body, signal }) {
  const cursor = db.features.findInArea(body.bounds)

  for await (const feature of cursor) {
    // Check if client cancelled
    if (signal.aborted) {
      await cursor.close()
      return
    }

    yield feature
  }
},

Working with Metadata

gRPC metadata is like HTTP headers. Access incoming metadata and set response metadata:

TypeScript
methods: () => ({
  SayHello: async ({ body, metadata }) => {
    // Read incoming metadata
    const authorization = metadata.get('authorization')
    const requestId = metadata.get('x-request-id')
    const clientVersion = metadata.get('x-client-version')

    // Validate authorization
    if (!authorization) {
      throw new GrpcError(GrpcStatus.UNAUTHENTICATED, 'Missing auth')
    }

    return { message: `Hello, ${body.name}!` }
  },
})

Error Handling

Use GrpcError to return proper gRPC status codes:

TypeScript
import { GrpcError, GrpcStatus } from '@justscale/rpc'

methods: () => ({
  GetUser: async ({ body }) => {
    const user = await db.users.get(User.ref`${body.userId}`)

    if (!user) {
      throw new GrpcError(
        GrpcStatus.NOT_FOUND,
        `User ${body.userId} not found`
      )
    }

    if (!user.active) {
      throw new GrpcError(
        GrpcStatus.PERMISSION_DENIED,
        'User account is disabled'
      )
    }

    return user
  },
})

Common gRPC status codes:

  • OK (0) - Success
  • CANCELLED (1) - Operation cancelled by client
  • INVALID_ARGUMENT (3) - Invalid request parameters
  • NOT_FOUND (5) - Resource not found
  • ALREADY_EXISTS (6) - Resource already exists
  • PERMISSION_DENIED (7) - No permission
  • UNAUTHENTICATED (16) - Missing/invalid credentials
  • RESOURCE_EXHAUSTED (8) - Rate limit exceeded
  • INTERNAL (13) - Internal server error

Serving the Controller

Add the controller to your JustScale app and serve via the RPC transport:

TypeScript
import JustScale from '@justscale/core'
import '@justscale/rpc' // Auto-registers RPC transport

const app = JustScale()
  .add(GreeterController)
  .add(RouteGuideController)
  .build()

await app.serve({
  rpc: {
    port: 50051,
    enableReflection: true,  // For grpcurl/grpcui
    enableHealthCheck: true, // For load balancer probes
  },
})

console.log('gRPC server running on port 50051')

Dependency Injection

Contract controllers support full dependency injection. Inject services and use them in your method implementations:

TypeScript
import { UserService } from './services/user.service'
import { NotificationService } from './services/notification.service'
import { AuditLogService } from './services/audit.service'

const UserController = createController
  .implements(UserServiceContract)
  .create({
    inject: {
      users: UserService,
      notifications: NotificationService,
      audit: AuditLogService,
    },

    methods: ({ users, notifications, audit }) => ({
      CreateUser: async ({ body, metadata }) => {
        const user = await users.create(body)

        // Send welcome notification
        await notifications.sendWelcome(user)

        // Log the action
        await audit.log('user.created', {
          userId: user.id,
          requestId: metadata.get('x-request-id'),
        })

        return user
      },

      GetUser: async ({ body }) => {
        return users.get(User.ref`${body.userId}`)
      },
    }),
  })

Type Safety

The compiler ensures your implementation matches the contract exactly:

TypeScript
// Proto definition
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

// TypeScript catches errors at compile time:

methods: () => ({
  // Error: Method name must match proto definition
  sayHello: async ({ body }) => ({ ... }),  // Should be 'SayHello'

  // Error: Return type must match HelloReply
  SayHello: async ({ body }) => ({
    msg: 'Hello'  // Error: Property 'message' is missing
  }),

  // Error: Input type must match HelloRequest
  SayHello: async ({ body }) => ({
    message: body.username  // Error: Property 'name' expected
  }),
})