Skip to main content

Overview

T3 Code server uses SQLite with Effect SQL for durable persistence, implementing event sourcing with CQRS projections for orchestration state.

Database Layer

SQLite Persistence

The server uses SQLite with WAL mode and foreign key constraints enabled. Source: apps/server/src/persistence/Layers/Sqlite.ts

Configuration

import { Layer } from "effect";
import { layerConfig, SqlitePersistenceMemory } from "./persistence/Layers/Sqlite";

// Production layer (uses state.sqlite in stateDir)
const productionDb = layerConfig;

// In-memory layer for testing
const testDb = SqlitePersistenceMemory;

Database Setup

On initialization, the following pragmas are applied:
  • PRAGMA journal_mode = WAL: Write-Ahead Logging for better concurrency
  • PRAGMA foreign_keys = ON: Enforce referential integrity
Migrations are automatically run on startup via runMigrations.

makeSqlitePersistenceLive

Source: apps/server/src/persistence/Layers/Sqlite.ts:38 Create a SQLite persistence layer with the specified database path.
import { Effect } from "effect";
import { makeSqlitePersistenceLive } from "./persistence/Layers/Sqlite";

const dbLayer = Effect.gen(function* () {
  return makeSqlitePersistenceLive("/path/to/state.sqlite");
}).pipe(Layer.unwrap);
Parameters:
  • dbPath: Absolute path to SQLite database file
Returns: Layer<SqlClient.SqlClient>

layerConfig

Source: apps/server/src/persistence/Layers/Sqlite.ts:52 Production persistence layer that reads database path from ServerConfig.stateDir.
const productionLayer = layerConfig;
// Database path: {stateDir}/state.sqlite
Dependencies: ServerConfig, Path.Path Provides: SqlClient.SqlClient

SqlitePersistenceMemory

Source: apps/server/src/persistence/Layers/Sqlite.ts:47 In-memory SQLite layer for testing. Data is lost when the process exits.
import { SqlitePersistenceMemory } from "./persistence/Layers/Sqlite";

const testLayer = SqlitePersistenceMemory;
Provides: SqlClient.SqlClient

Runtime Detection

The persistence layer automatically detects the runtime environment:
  • Bun: Uses @effect/sql-sqlite-bun
  • Node.js: Uses custom Node SQLite client

Event Store

OrchestrationEventStore

Durable append-only event store for orchestration events. Service Tag: t3/persistence/Services/OrchestrationEventStore Source: apps/server/src/persistence/Services/OrchestrationEventStore.ts:67

Methods

append
(event: Omit<OrchestrationEvent, 'sequence'>) => Effect<OrchestrationEvent, OrchestrationEventStoreError>
Persist a new orchestration eventActor kind is inferred from command/metadata before persistence. Sequence number is assigned by the storage layer.Parameters:
  • event: Event payload without sequence number
Returns: Stored event with assigned sequence numberErrors: OrchestrationEventStoreError on database failures
readFromSequence
(sequenceExclusive: number, limit?: number) => Stream<OrchestrationEvent, OrchestrationEventStoreError>
Replay events after the provided sequenceReads in fixed-size pages and normalizes non-integer/negative limits.Parameters:
  • sequenceExclusive: Sequence cursor (exclusive start)
  • limit: Maximum number of events to emit (optional)
Returns: Stream of ordered eventsErrors: OrchestrationEventStoreError on database failures
readAll
() => Stream<OrchestrationEvent, OrchestrationEventStoreError>
Read all events from the beginning of the streamReturns: Stream of all stored events in orderErrors: OrchestrationEventStoreError on database failures

Example Usage

import { Effect, Stream } from "effect";
import { OrchestrationEventStore } from "./persistence/Services/OrchestrationEventStore";

const program = Effect.gen(function* () {
  const eventStore = yield* OrchestrationEventStore;
  
  // Append an event
  const event = yield* eventStore.append({
    _tag: "OrchestrationEvent.ThreadCreated",
    commandId: "cmd-123",
    timestamp: new Date().toISOString(),
    // ... event payload
  });
  console.log(`Event persisted at sequence: ${event.sequence}`);
  
  // Read all events
  const events = yield* Stream.runCollect(eventStore.readAll());
  console.log(`Total events: ${events.length}`);
  
  // Replay from sequence 100
  yield* Stream.runForEach(
    eventStore.readFromSequence(100, 50),
    (event) => Effect.log(`Event ${event.sequence}: ${event._tag}`)
  );
});

Projection Repositories

Projection repositories maintain denormalized read models derived from orchestration events.

ProjectionThreadRepository

Persistence for projected thread records. Service Tag: t3/persistence/Services/ProjectionThreads/ProjectionThreadRepository Source: apps/server/src/persistence/Services/ProjectionThreads.ts:91

ProjectionThread Schema

threadId
ThreadId
required
Unique thread identifier
projectId
ProjectId
required
Parent project identifier
title
string
required
Thread title
model
string
required
Active model name (e.g., "o3-mini", "claude-3.5-sonnet")
runtimeMode
RuntimeMode
required
Runtime mode: "editor" or "agent"
interactionMode
ProviderInteractionMode
required
Provider interaction mode: "blocking" or "streaming"
branch
string | null
Git branch name if applicable
worktreePath
string | null
Git worktree path if applicable
latestTurnId
TurnId | null
ID of the most recent turn
createdAt
IsoDateTime
required
Thread creation timestamp
updatedAt
IsoDateTime
required
Last update timestamp
deletedAt
IsoDateTime | null
Soft deletion timestamp

Methods

upsert
(thread: ProjectionThread) => Effect<void, ProjectionRepositoryError>
Insert or replace a projected thread rowUpserts by threadId.
getById
(input: { threadId: ThreadId }) => Effect<Option<ProjectionThread>, ProjectionRepositoryError>
Read a projected thread row by IDReturns: Option.Some(thread) if found, Option.None otherwise
listByProjectId
(input: { projectId: ProjectId }) => Effect<ReadonlyArray<ProjectionThread>, ProjectionRepositoryError>
List projected threads for a projectReturned in deterministic creation order.
deleteById
(input: { threadId: ThreadId }) => Effect<void, ProjectionRepositoryError>
Soft-delete a projected thread row by IDSets deletedAt timestamp without removing the row.

ProjectionProjectRepository

Persistence for projected project records. Service Tag: t3/persistence/Services/ProjectionProjects/ProjectionProjectRepository Source: apps/server/src/persistence/Services/ProjectionProjects.ts:76

ProjectionProject Schema

projectId
ProjectId
required
Unique project identifier
title
string
required
Project title
workspaceRoot
string
required
Absolute path to workspace root directory
defaultModel
string | null
Default model for new threads
scripts
Array<ProjectScript>
required
Project scripts for task automationPersisted as JSON.
createdAt
IsoDateTime
required
Project creation timestamp
updatedAt
IsoDateTime
required
Last update timestamp
deletedAt
IsoDateTime | null
Soft deletion timestamp

Methods

upsert
(project: ProjectionProject) => Effect<void, ProjectionRepositoryError>
Insert or replace a projected project rowUpserts by projectId and persists scripts through JSON encoding.
getById
(input: { projectId: ProjectId }) => Effect<Option<ProjectionProject>, ProjectionRepositoryError>
Read a projected project row by IDReturns: Option.Some(project) if found, Option.None otherwise
listAll
() => Effect<ReadonlyArray<ProjectionProject>, ProjectionRepositoryError>
List all projected project rowsReturned in deterministic creation order.
deleteById
(input: { projectId: ProjectId }) => Effect<void, ProjectionRepositoryError>
Soft-delete a projected project row by IDSets deletedAt timestamp without removing the row.

Additional Projection Repositories

The persistence layer includes additional projection repositories:
  • ProjectionTurnRepository: Turn records with status and timestamps
  • ProjectionThreadMessageRepository: Thread messages with attachments
  • ProjectionThreadActivityRepository: Thread activity logs
  • ProjectionThreadSessionRepository: Provider session records
  • ProjectionPendingApprovalRepository: Pending approval requests
  • ProjectionCheckpointRepository: Checkpoint snapshots
  • ProjectionThreadProposedPlanRepository: Proposed plan records
  • ProjectionStateRepository: Projection cursor state
Refer to apps/server/src/persistence/Services/ for detailed interfaces.

Command Receipts

OrchestrationCommandReceiptRepository

Persistence for command deduplication receipts. Service Tag: t3/persistence/Services/OrchestrationCommandReceipts Source: apps/server/src/persistence/Services/OrchestrationCommandReceipts.ts Used by the orchestration engine to deduplicate command dispatch through idempotency tracking.

Provider Session Runtime

ProviderSessionRuntimeRepository

Persistence for active provider session runtime state. Service Tag: t3/persistence/Services/ProviderSessionRuntime Source: apps/server/src/persistence/Services/ProviderSessionRuntime.ts Stores session metadata required for session resumption and directory lookups.

Error Types

OrchestrationEventStoreError

Database or decoding errors from event store operations. Source: apps/server/src/persistence/Errors.ts

ProjectionRepositoryError

Database or schema errors from projection repository operations. Source: apps/server/src/persistence/Errors.ts

Migrations

Database schema migrations are managed via Effect and executed automatically on startup. Source: apps/server/src/persistence/Migrations.ts Migration files are located in apps/server/src/persistence/Migrations/ and numbered sequentially:
  • 001_OrchestrationEvents.ts: Event store table
  • 002_OrchestrationCommandReceipts.ts: Command receipt tracking
  • 003_CheckpointDiffBlobs.ts: Checkpoint diff storage
  • 004_ProviderSessionRuntime.ts: Provider session metadata
  • 005_Projections.ts: Core projection tables
  • And more…

Example: Full Persistence Setup

import { Effect, Layer } from "effect";
import { layerConfig } from "./persistence/Layers/Sqlite";
import { OrchestrationEventStore } from "./persistence/Services/OrchestrationEventStore";
import { ProjectionThreadRepository } from "./persistence/Services/ProjectionThreads";
import { ServerConfig } from "./config";

const program = Effect.gen(function* () {
  // Access services
  const eventStore = yield* OrchestrationEventStore;
  const threadRepo = yield* ProjectionThreadRepository;
  
  // Read all events
  const events = yield* Stream.runCollect(eventStore.readAll());
  console.log(`Total events: ${events.length}`);
  
  // List threads for a project
  const threads = yield* threadRepo.listByProjectId({ projectId: "project-1" });
  console.log(`Threads: ${threads.length}`);
});

// Run with production persistence
const main = program.pipe(
  Effect.provide(OrchestrationEventStoreLive),
  Effect.provide(ProjectionThreadRepositoryLive),
  Effect.provide(layerConfig),
  Effect.provide(ServerConfig.layer),
);

Effect.runPromise(main);