RPC Controllers
Implementing gRPC services with contract-based controllers
Contract controllers implement gRPC service definitions imported directly from .proto files. The TypeScript compiler plugin generates types at build time, so you get full type safety without codegen.
Importing Contracts from Proto Files
Import the generated contract directly from your .proto file. The compiler plugin handles type generation automatically:
// greeter.proto
syntax = "proto3";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc SayHelloStream (HelloRequest) returns (stream HelloReply);
rpc RecordRoute (stream Point) returns (RouteSummary);
rpc RouteChat (stream RouteNote) returns (stream RouteNote);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}import { GreeterContract } from './greeter.proto'
// The contract provides:
// - GreeterContract: Service definition with method signatures
// - HelloRequest, HelloReply: TypeScript types for messages
// - Encode/decode functions for binary serializationCreating a Contract Controller
Use createController.implements(Contract) to create a controller that implements a gRPC service:
import { createController } from '@justscale/core'
import { GreeterContract } from './greeter.proto'
const GreeterController = createController
.implements(GreeterContract)
.create({
inject: { db: DatabaseService },
methods: ({ db }) => ({
SayHello: async ({ body }) => ({
message: `Hello, ${body.name}!`,
}),
// All methods from the contract must be implemented
SayHelloStream: async function* ({ body }) {
for (let i = 0; i < 5; i++) {
yield { message: `Hello ${body.name} #${i + 1}` }
await delay(1000)
}
},
RecordRoute: async ({ body }) => {
// body is AsyncIterable<Point> for client streaming
let pointCount = 0
for await (const point of body) {
pointCount++
}
return { pointCount, distance: 0, elapsedTime: 0, featureCount: 0 }
},
RouteChat: async function* ({ body }) {
// body is AsyncIterable<RouteNote> for bidi streaming
for await (const note of body) {
yield { location: note.location, message: `Echo: ${note.message}` }
}
},
}),
})Streaming Modes
gRPC supports four streaming modes. The method signature determines which mode is used:
- Unary - Single request, single response. Return a value or Promise.
- Server streaming - Single request, stream of responses. Return an async generator.
- Client streaming - Stream of requests, single response.
bodyis an AsyncIterable. - Bidirectional - Stream of requests, stream of responses. Combine both patterns.
methods: () => ({
// Unary: request -> response
SayHello: async ({ body }) => {
return { message: `Hello, ${body.name}!` }
},
// Server streaming: request -> stream of responses
ListFeatures: async function* ({ body }) {
const features = await db.features.findInArea(body.bounds)
for (const feature of features) {
yield feature
}
},
// Client streaming: stream of requests -> response
RecordRoute: async ({ body }) => {
const points: Point[] = []
for await (const point of body) {
points.push(point)
}
return calculateRouteSummary(points)
},
// Bidirectional: stream of requests -> stream of responses
RouteChat: async function* ({ body }) {
for await (const note of body) {
const nearby = await db.notes.findNear(note.location)
for (const n of nearby) {
yield n
}
}
},
})The RpcContext Object
Every RPC handler receives an RpcContext with the following properties:
interface RpcContext<TBody, TSession = unknown> {
/** Request body (message for unary/server, AsyncIterable for client/bidi) */
body: TBody
/** gRPC metadata (headers) */
metadata: Map<string, string>
/** AbortSignal for cancellation */
signal: AbortSignal
/** Request deadline (if set by client) */
deadline?: Date
/** Session data (populated by auth middleware) */
session: TSession
}methods: () => ({
SayHello: async ({ body, metadata, signal, session }) => {
// Access request message
const name = body.name
// Read gRPC metadata (headers)
const traceId = metadata.get('x-trace-id')
// Check for cancellation
if (signal.aborted) {
throw new Error('Request cancelled')
}
// Access authenticated user (if auth middleware is used)
const userId = session?.userId
return { message: `Hello, ${name}!` }
},
})Handling Cancellation
Use the signal property to handle client cancellation gracefully:
ListFeatures: async function* ({ body, signal }) {
const cursor = db.features.findInArea(body.bounds)
for await (const feature of cursor) {
// Check if client cancelled
if (signal.aborted) {
await cursor.close()
return
}
yield feature
}
},Working with Metadata
gRPC metadata is like HTTP headers. Access incoming metadata and set response metadata:
methods: () => ({
SayHello: async ({ body, metadata }) => {
// Read incoming metadata
const authorization = metadata.get('authorization')
const requestId = metadata.get('x-request-id')
const clientVersion = metadata.get('x-client-version')
// Validate authorization
if (!authorization) {
throw new GrpcError(GrpcStatus.UNAUTHENTICATED, 'Missing auth')
}
return { message: `Hello, ${body.name}!` }
},
})Error Handling
Use GrpcError to return proper gRPC status codes:
import { GrpcError, GrpcStatus } from '@justscale/rpc'
methods: () => ({
GetUser: async ({ body }) => {
const user = await db.users.get(User.ref`${body.userId}`)
if (!user) {
throw new GrpcError(
GrpcStatus.NOT_FOUND,
`User ${body.userId} not found`
)
}
if (!user.active) {
throw new GrpcError(
GrpcStatus.PERMISSION_DENIED,
'User account is disabled'
)
}
return user
},
})Common gRPC status codes:
OK (0)- SuccessCANCELLED (1)- Operation cancelled by clientINVALID_ARGUMENT (3)- Invalid request parametersNOT_FOUND (5)- Resource not foundALREADY_EXISTS (6)- Resource already existsPERMISSION_DENIED (7)- No permissionUNAUTHENTICATED (16)- Missing/invalid credentialsRESOURCE_EXHAUSTED (8)- Rate limit exceededINTERNAL (13)- Internal server error
Serving the Controller
Add the controller to your JustScale app and serve via the RPC transport:
import JustScale from '@justscale/core'
import '@justscale/rpc' // Auto-registers RPC transport
const app = JustScale()
.add(GreeterController)
.add(RouteGuideController)
.build()
await app.serve({
rpc: {
port: 50051,
enableReflection: true, // For grpcurl/grpcui
enableHealthCheck: true, // For load balancer probes
},
})
console.log('gRPC server running on port 50051')Dependency Injection
Contract controllers support full dependency injection. Inject services and use them in your method implementations:
import { UserService } from './services/user.service'
import { NotificationService } from './services/notification.service'
import { AuditLogService } from './services/audit.service'
const UserController = createController
.implements(UserServiceContract)
.create({
inject: {
users: UserService,
notifications: NotificationService,
audit: AuditLogService,
},
methods: ({ users, notifications, audit }) => ({
CreateUser: async ({ body, metadata }) => {
const user = await users.create(body)
// Send welcome notification
await notifications.sendWelcome(user)
// Log the action
await audit.log('user.created', {
userId: user.id,
requestId: metadata.get('x-request-id'),
})
return user
},
GetUser: async ({ body }) => {
return users.get(User.ref`${body.userId}`)
},
}),
})Type Safety
The compiler ensures your implementation matches the contract exactly:
// Proto definition
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
// TypeScript catches errors at compile time:
methods: () => ({
// Error: Method name must match proto definition
sayHello: async ({ body }) => ({ ... }), // Should be 'SayHello'
// Error: Return type must match HelloReply
SayHello: async ({ body }) => ({
msg: 'Hello' // Error: Property 'message' is missing
}),
// Error: Input type must match HelloRequest
SayHello: async ({ body }) => ({
message: body.username // Error: Property 'name' expected
}),
})