Valet

Client SDK Internals

React integration, local storage, and code generation.

The client SDK is a TypeScript library that provides real-time sync, local-first storage, code generation, and React bindings. It lives in valet/src/ across six modules.

valet/src/
├── index.ts — Package exports
├── core/ — Protocol, subscriptions, logging
├── index.ts — Module exports
├── protocol.ts — Wire format types, serialization, type guards
├── subscription.ts — Subscription state machine, document cache
└── logger.ts — Tagged structured logging
├── server/ — Schema and function definitions (runs at codegen time)
├── index.ts — Module exports
├── schema.ts — defineTable, defineSchema, TableBuilder, sync config
├── validators.ts — v.string(), v.number(), type inference helpers
├── functions.ts — defineQuery, defineMutation, execution modes
├── context.ts — QueryContext, MutationContext, Id<T>, DatabaseReader/Writer
├── db.ts — GenericDatabaseQueryBuilder, filter builders, QuerySpec
└── auth.ts — Built-in auth table definitions
├── qb/ — Query builder (used by both codegen and runtime)
├── index.ts — Module exports
└── query.ts — QueryBuilder, SyncFilterBuilder, context proxy
├── react/ — React integration
├── index.ts — Module exports
├── client.ts — ValetClient: WebSocket connection, mutations, subscriptions
├── auth-client.ts — ValetAuthClient: HTTP auth (sign-up/sign-in/sign-out)
├── hooks.tsx — useQuery, useMutation, useConnectionState, etc.
├── auth-provider.tsx — ValetAuthProvider, useAuth
└── factory.tsx — createValetProvider: wires everything together
├── local/ — Local-first storage
├── index.ts — Module exports
├── database.ts — LocalDatabase: wa-sqlite WASM wrapper
├── sync.ts — SyncEngine: applies server deltas to local DB
├── context.ts — LocalDatabaseWriter, ReplayableEnv: deterministic replay
└── mutation-log.ts — MutationLog: persistent log for deterministic replay
└── codegen/ — Code generation
├── index.ts — Module exports
├── cli.ts — CLI entry point, watch mode, push to server
├── parser.ts — Parse schema.ts and function files into AST
└── generator.ts — Generate api.js/d.ts, handlers, schema.json, react exports

Core module

Protocol (protocol.ts)

Wire format for WebSocket communication. Protocol version: 4.

Client → Server:
Auth { token }
Subscribe { sub_id, function, args, last_sync_version? }
Unsubscribe { sub_id }
SyncWrite { request_id, table, operation, doc, function?, args? }
Server → Client:
Hello { protocol_version }
AuthOk { user_id, expires_at? }
AuthError { code, message }
SubscribeResponse { sub_id, status, sync_type, data, version }
Sync { deltas: Delta[] }
SyncWriteResult { request_id, success, result?, error?, rollback? }
Error { code, message }
Delta
├── sub_id, table
├── operation: "insert" | "update" | "remove" | "replace"
├── doc?, _id?, version
└── docs? — for "replace" (full snapshot)

Exports constructors (createAuthMessage, createSubscribeMessage, etc.), type guards (isHello, isAuthOk, etc.), and serialization (parseServerMessage, serializeClientMessage).

Subscription manager (subscription.ts)

Manages subscription lifecycle and document cache.

SubscriptionManager
Subscription
├── id: string
├── function: string
├── args: unknown
├── state: "pending" | "active" | "error"
├── docs: Map<string, T> — document cache by _id
├── version: number — last synced server version
└── error?: string
createSubscription(id, fn, args)
getSubscription(id) / getActiveSubscriptions() / getAllSubscriptions()
removeSubscription(id)
handleSubscribeResponse(response) → SubscriptionChange[]
├── Set state = "active" or "error"
├── Populate docs map from response.data
└── Emit changes
handleSync(sync) → SubscriptionChange[]
├── For each delta:
├── "replace" → clear docs, repopulate from delta.docs
├── "insert" → add to docs map
├── "update" → replace in docs map
└── "remove" → delete from docs map
└── Emit SubscriptionChange per mutation
onChange(listener) → unsubscribe
getDocs(subId) → T[]
getDoc(subId, docId) → T | undefined
clear()

Logger (logger.ts)

Logger
├── level: DEBUG | INFO | WARN | ERROR
├── enabled: boolean
└── tag: string
debug / info / warn / error (message, metadata?)
setLevel / setEnabled / getLevel / isEnabled
createLogger(tag, options?) → Logger
defaultLogger — tag: "Valet"
Global configuration:
configureLogger(config) — Set global log level/enabled state
getLoggerConfig() → LoggerConfig
resetLoggerConfig() — Reset to defaults

Server module

Defines the schema and function APIs that users write. Consumed at codegen time to produce runtime artifacts.

Validators (validators.ts)

Validator<T>
├── type: string — runtime type name
├── _type: T — phantom type for inference
├── isOptional?: boolean
├── isDeprecated?: boolean
└── deprecated() → Validator<T>
v.string() → StringValidator
v.number() → NumberValidator
v.int() → IntValidator
v.boolean() → BooleanValidator
v.bytes() → BytesValidator
v.null() → NullValidator
v.id(table) → IdValidator<Table>
v.array(element) → ArrayValidator<T>
v.object(fields) → ObjectValidator<{ ... }>
v.optional(inner) → OptionalValidator<T>
v.union(...vars) → UnionValidator<T>
v.literal(value) → LiteralValidator<T>
Type-level helpers:
Infer<V> — extract TS type from a validator
InferFields<T> — extract TS type from a record of validators

Schema (schema.ts)

TableBuilder<T extends FieldDefinitions>
├── _fields: T
├── _indexes: IndexDefinition[]
├── _syncConfig?: SyncConfig
├── _backfills: Array<{ column, fn }>
├── index(name, fields) → this
├── sync(config) → this
├── backfill(column, fn) → this
└── compileSyncConfig(ctx) → CompiledSyncConfig
defineTable(fields) → TableBuilder<T>
defineSchema(tables) → CompiledSchema<T>
SyncConfig
├── filter?: (q, ctx) => FilterExpr — per-user row filtering
├── mode: "full" | "windowed" | "none"
└── window?: { field, duration }
IndexDefinition { name, fields: string[] }

Chainable: defineTable({...}).index('by_user', ['userId']).sync({ mode: 'full', filter: ... }).backfill('col', fn)

Functions (functions.ts)

defineQuery(definition) → RegisteredQuery
defineQuery({
args: { userId: v.id('users') },
execution: 'local' | 'server' | 'fetch', — default: 'server'
handler: async (ctx, args) => { ... }
})
defineMutation(definition) → RegisteredMutation
defineMutation({
args: { text: v.string() },
execution: 'local' | 'server', — default: 'local'
handler: async (ctx, args) => { ... }
})
RegisteredQuery<DataModel, Args, Returns>
├── _type: 'query'
├── _args: InferArgs<Args> — phantom
├── _returns: Returns — phantom
├── execution: QueryExecutionMode
└── definition
RegisteredMutation<DataModel, Args, Returns>
├── _type: 'mutation'
├── _args: InferArgs<Args> — phantom
├── _returns: Returns — phantom
├── execution: MutationExecutionMode
└── definition
FunctionArgs<F> — extract args type from registered function
FunctionReturns<F> — extract return type from registered function

Context types (context.ts)

Id<TableName> — branded string carrying table name at type level
QueryContext<DataModel>
├── db: DatabaseReader<DataModel>
└── auth?: AuthInfo { userId, claims? }
MutationContext<DataModel>
├── db: DatabaseWriter<DataModel>
└── auth?: AuthInfo
DatabaseReader<DM>
├── query(table) → DatabaseQueryBuilder<DocType>
├── get(id) → Doc | null
└── search(table, field, options) → Doc[]
DatabaseWriter<DM> extends DatabaseReader<DM>
├── insert(table, doc) → Id<TableName>
├── patch(id, updates) → void
├── replace(id, doc) → void
└── delete(id) → void
DatabaseQueryBuilder<T>
├── filter(fn) → self
├── order(field, direction?) → self
├── take(n) / skip(n) → self
├── paginate(options) → PaginatedResult<T>
├── collect() → T[]
├── first() → T | null
└── unique() → T

Database implementation (db.ts)

Concrete implementations of the context interfaces.

GenericFilterBuilder<T>
├── eq / neq / lt / lte / gt / gte
├── oneOf / isNull / isNotNull / arrayContains
├── and / or / not
└── Each returns FilterExpression (opaque branded type)
GenericDatabaseQueryBuilder<T>
├── spec: QuerySpec
├── executor: QueryExecutor
└── Implements DatabaseQueryBuilder<T>
QuerySpec
├── table, filters: InternalFilterExpr[]
├── index?: { name, constraints: IndexConstraint[] }
├── orderBy?, limit?, offset?, cursor?
└── Used by both local and server query execution
QueryExecutor (interface)
├── execute(spec) → T[]
├── get(id) / insert / patch / replace / delete
└── search(table, field, options) → T[]

FilterExpression is an opaque branded type. Internal representation is InternalFilterExpr with a type discriminant. wrapExpr / unwrapExpr handle conversion.


Query builder module (qb/)

QueryBuilder (query.ts)

Used at runtime in React hooks to build subscription queries.

QueryBuilder
├── filters: FilterExpr[]
├── orderBySpecs: OrderBy[]
├── limitValue?, offsetValue?
├── eq / neq / lt / lte / gt / gte
├── oneOf / isNull / isNotNull / arrayContains
├── in(field, subquery)
├── or(...queries) / not()
├── order(field, direction?) / limit(n) / offset(n)
├── build() → QueryArgs
├── buildFilter() → Record<string, unknown>
└── buildFilterExpr() → FilterExpr
query() → QueryBuilder — factory function
select(field) → SubqueryBuilder — for IN clauses

SyncFilterBuilder

Used in .sync({ filter: (q, ctx) => q.eq(...) }) definitions. Methods match QueryBuilder but return FilterExpr directly instead of chaining.

Context proxy

createContextProxy(basePath?) → Proxy
Captures property access chains and converts them to $var references:
ctx.auth.userId → { $var: "auth.userId" }
Used at codegen time to serialize context references in sync filters.

React module

ValetClient (client.ts)

Core WebSocket client. Manages connection, authentication, subscriptions, and mutations.

ValetClient
├── ws: WebSocket | null
├── subscriptionManager: SubscriptionManager
├── pendingMutations: Map<string, PendingMutation>
├── mutationQueue: QueuedMutation[] — offline queue
├── syncEngine?: SyncEngine
├── handlers?: HandlerRegistry
├── connectionState: "disconnected" | "connecting" | "connected" | "reconnecting"
├── authState: "pending" | "authenticated" | "unauthenticated" | "error"
├── authUserId?: string
├── authExpiresAt?: number
└── authError?: AuthError
Connection:
connect() — open WS, send Auth, wait for AuthOk
disconnect() — close WS
getConnectionState()
onConnectionStateChange(listener) → unsubscribe
Auth:
getAuthState() / getAuthUserId() / getAuthError()
onAuthStateChange(listener) → unsubscribe
Subscriptions:
subscribe(function, args, callback) → QueryHandle<T>
unsubscribe(subId)
getSubscriptionData(subId)
Mutations:
mutate(function, args) — determines execution mode, dispatches
mutateLocal(function, args) — execute handler locally, log mutation, queue for sync
mutateServer(function, args) — send to server, wait for result
mutateOptimistic(function, args) — local + server with rollback on rejection
replayMutation(logged) — replay from MutationLog with captured env
rebasePendingMutations() — clear local state, replay all pending against fresh server data
rebaseAndFlush() — wait for syncs to settle, rebase, then flush queue
Message handling:
handleMessage → handleHello / handleAuthOk / handleAuthError / handleSubscribeResponse / handleSync / handleSyncWriteResult / handleError

Reconnection: exponential backoff from initialReconnectDelay (1s) to maxReconnectDelay (30s), up to maxReconnectAttempts (10). Resubscribes all active subscriptions on reconnect, then runs rebaseAndFlush() to replay pending mutations against fresh server state.

Offline mutations: when disconnected, mutateLocal() runs the handler locally with a ReplayableEnv that captures non-deterministic values. The mutation is logged to MutationLog as { functionName, args, capturedEnv } and queued as a QueuedMutation for sending when reconnected.

Rebase on reconnect: after reconnection, the client receives fresh server state via subscription responses. rebasePendingMutations() then:

  1. Clears all local data written by pending mutations
  2. Replays each LoggedMutation in sequence order against the fresh server state
  3. Uses withReplayableGlobals(env) to ensure identical IDs, timestamps, and random values
  4. The replayed local state now reflects server truth + pending local changes
  5. flushMutationQueue() sends the queued mutations to the server for confirmation

ValetAuthClient (auth-client.ts)

HTTP client for built-in auth. Stores session in localStorage (or custom AuthStorage).

ValetAuthClient
├── config: ValetAuthConfig
├── storage: AuthStorage
├── currentUser: AuthUser | null
├── token: string | null
├── expiresAt: number | null
└── state: "loading" | "authenticated" | "unauthenticated"
signUp({ email, password, name? }) → { user }
signIn({ email, password }) → { user }
signOut()
getTokenProvider() → () => Promise<string> — for use with ValetClient
getUser() / getState() / getToken()
Storage keys: valet_auth_token, valet_auth_user, valet_auth_expires_at

React hooks (hooks.tsx)

useQuery(query, args, options?) → { data, isLoading, error, isStale }
├── Detects execution mode (local / server / fetch)
├── Local: queries SyncEngine directly, re-queries on data changes
├── Server: calls client.subscribe(), returns live data
└── Options: { skip?: boolean }
useMutation(mutation, options?) → { mutate, isLoading }
├── Calls client.mutate()
└── Options: { onSuccess?, onError? }
useOptimisticMutation(mutation, options?) → { mutate, isLoading }
├── Executes handler locally first
└── Falls back on server rejection
useFetchQuery(query, args) → Promise<Returns> — One-shot fetch, no subscription
useValetClient() → ValetClient
useConnectionState() → ConnectionState
useIsConnected() → boolean
useIsReconnecting() → boolean
useValetAuth() → { user, state, signUp, signIn, signOut, ... }
fetchQuery(client, query, args) → Promise<Returns> — Non-hook version for use outside React

Provider factory (factory.tsx)

Wires together LocalDatabase, SyncEngine, ValetClient, and React context.

createValetProvider(config) → {
ValetProvider, useQuery, useMutation, useOptimisticMutation, useFetchQuery,
useValetClient, useConnectionState, useIsConnected, useIsReconnecting,
useValetAuth, fetchQuery
}
config:
├── schema: JsonSchema | SyncSchema
├── handlers?: HandlerRegistry
├── dbName?: string — default: "valet.db"
└── locateWasm?: (file) => string
Initialization sequence:
1. Initialize LocalDatabase with wa-sqlite WASM
2. Create SyncEngine with schema + backfills
3. Initialize tables (create/migrate from schema)
4. Create ValetClient with syncEngine + handlers
5. Render children (show loadingFallback while init, errorFallback on failure)

Auth provider (auth-provider.tsx)

<ValetAuthProvider url={serverUrl} storage={storage?}>
{children}
</ValetAuthProvider>
Provides:
├── authClient: ValetAuthClient
├── user: AuthUser | null
├── state: AuthState
├── isAuthenticated / isLoading
├── signUp / signIn / signOut
└── Accessed via useAuth() hook

Local storage module

LocalDatabase (database.ts)

Wraps wa-sqlite for local SQLite storage.

LocalDatabase
├── db: DB | null — wa-sqlite adapter (exec, execO, execA, tx, onUpdate)
├── initialized: boolean
├── changeListeners: Set<ChangeListener>
└── unsubscribeUpdate: (() => void) | null
init(options?) — load WASM, open DB
createTable(name, schema) — CREATE TABLE IF NOT EXISTS
dropTable(name) / tableExists(name)
generateId() → string — crypto.randomUUID or fallback
CRUD:
insert(table, doc) → id
update(table, id, updates) — partial update
replace(table, id, doc) — full replacement
delete(table, id)
query(table, options?) → T[] — filter/order/limit/offset
onDatabaseChange(listener) → unsubscribe

SyncEngine (sync.ts)

Applies server subscription data and deltas to the local database. Handles schema migrations and backfills.

SyncEngine
├── db: LocalDatabase
├── schema: SyncSchema
├── schemaHash?: string
├── compiledBackfills: CompiledBackfill[]
├── syncStates: Map<string, SyncState>
├── initializedTables: Set<string>
├── onSyncStateChange?: callback
└── onDataChange?: callback
initializeTables()
├── For each table in schema:
├── Create table if not exists (fields → column types)
└── Add missing columns (schema migration)
├── Detect schema hash change → run migrations
└── Execute backfills for affected tables
handleSubscribeResponse(response)
├── Full sync: clear table, insert all docs
├── Delta sync: apply deltas since last version
└── Update SyncState { subId, table, version, initialized }
handleSync(sync)
├── For each delta:
├── insert → db.insert()
├── update → db.update()
├── remove → db.delete()
└── replace → clear + insert all
└── Notify data change listeners
Local CRUD (for mutations):
insertDoc / updateDoc / deleteDoc / getDoc / queryDocs
SyncSchema = Record<string, { fields: Record<string, SchemaField> }>
SchemaField { type, isOptional? }

Local context (context.ts)

Implements DatabaseWriter backed by SyncEngine for local mutation execution. Also provides the deterministic replay infrastructure.

LocalDatabaseWriter
├── operations: TrackedOperation[] — tracks mutations for offline queue
├── replayableEnv?: ReplayableEnv — captures non-deterministic values
├── query(table) → LocalQueryBuilder
├── get(id) → T | null
├── insert(table, doc) → Id — uses replayableEnv.generateId()
├── patch(id, updates)
├── replace(id, doc)
└── delete(id) — captures pre-delete state
TrackedOperation { type, table, docId, preDeleteDoc? }
LocalQueryBuilder<T>
├── In-memory filter, order, limit, skip
├── collect() / first() / unique()
└── paginate(options) → PaginatedResult
createLocalMutationContext(syncEngine, auth?) → MutationContext
createLocalQueryContext(syncEngine, auth?) → QueryContext

Deterministic replay (context.ts)

Local mutations must produce identical results when replayed during rebase. Three sources of non-determinism are captured on first execution and replayed identically on subsequent runs.

ReplayableEnv
├── generatedIds: string[] — captured crypto.randomUUID() calls
├── randomValues: number[] — captured Math.random() calls
├── now: number — captured Date.now() value
├── idIndex / randomIndex — replay counters
├── generateId() → string — capture on first run, replay on rebase
├── random() → number — capture on first run, replay on rebase
├── now() → number — capture on first run, replay on rebase
└── capture() → CapturedEnv — serialize for persistence in MutationLog
CapturedEnv { generatedIds, randomValues, now }
withReplayableGlobals(env, fn) → Promise<T>
├── Patches global crypto.randomUUID → env.generateId()
├── Patches global Math.random → env.random()
├── Patches global Date.now → env.now()
├── Executes fn()
└── Restores originals (even on error)

Mutation log (mutation-log.ts)

Persistent log of pending mutations. Each entry stores the function call (not a data snapshot) plus the captured environment, enabling deterministic replay.

MutationLog
├── storage: MutationLogStorage
├── sequence: number — monotonic counter
├── log(functionName, args, env) → LoggedMutation
├── getPending() → LoggedMutation[] — sorted by sequence
├── confirm(mutationId) — mark as confirmed, remove
├── fail(mutationId) — mark as failed, remove
└── clear()
LoggedMutation
├── mutationId: string
├── functionName: string
├── args: unknown
├── env: CapturedEnv — captured non-deterministic values
├── baseVersion: number — server version at time of mutation
├── sequence: number — local ordering
├── timestamp: number
└── status: "pending" | "confirmed" | "failed"
MutationLogStorage (interface)
├── getAll() → LoggedMutation[]
├── append(mutation)
├── remove(mutationId)
└── clear()
InMemoryMutationLogStorage — default implementation

Codegen module

Parser (parser.ts)

Parses schema.ts and function files into a structured AST.

parseValetDir(valetDir) → ParsedValetDir
├── parseSchemaFile(schemaPath) → ParsedSchema
├── Find defineSchema({...}) call
├── For each table:
├── Extract defineTable({...}) fields
├── Parse field types recursively → ParsedType
├── Detect .deprecated() chains
├── Extract .index() calls
├── Extract .sync() config
└── Extract .backfill('col', fn) calls (table-level)
└── Return { tables, backfills }
└── parseFunctionFile(path, tableName) → ParsedFunctionFile
├── Find defineQuery / defineMutation calls
├── Extract args validators
├── Extract execution mode
├── Extract handler source code
└── Return { tableName, queries, mutations }
ParsedType (recursive): primitive | array | object | union | literal | optional | unknown
ParsedField { name, type, parsedType?, isOptional, isDeprecated? }
ParsedTable { name, fields, indexes, sync? }
ParsedBackfill { table, column, code, paramName }
ParsedSchema { tables, backfills }
ParsedQuery { name, exportName, table, args, execution, handlerSource, returnType? }
ParsedMutation { name, exportName, table, args, execution, handlerSource, returnType? }

Generator (generator.ts)

Produces runtime artifacts from parsed AST.

generate(options) → GeneratedFiles
options: { valetDir, outputDir }
Generated files:
├── api.js — runtime api object with function references
├── api.d.ts — full type declarations (DataModel, typed api, phantom types)
├── handlers.js — handler registry for local execution (conditional: only when local queries or mutations exist)
├── handlers.d.ts — handler type declarations (conditional: only when local queries or mutations exist)
├── schema.json — table definitions for server (fields, indexes, backfills)
├── functions.json — handler code for server (name, type, execution, code) (conditional: only when any queries or mutations exist)
├── react.js — pre-configured provider and hooks
└── react.d.ts — typed React exports

CLI (cli.ts)

valet codegen [options]
--valet-dir <path> — source directory (default: ./valet)
--output-dir <path> — output directory (default: ./_generated/valet)
--server <url> — server URL (default: http://localhost:3000)
--deploy-key <key> — deploy key (or VALET_DEPLOY_KEY env)
--watch, -w — watch for changes
--help, -h
Sequence:
1. Parse schema + functions
2. Generate all output files
3. Push schema.json + functions.json to server (if --server)
4. If --watch: watch valet dir, re-run on change

Data flow

Subscription lifecycle

useQuery(api.todos.list, { userId })

ValetClient.subscribe("todos.list", args, callback)

SubscriptionManager.createSubscription() — state: "pending"

WebSocket → Subscribe { sub_id, function, args }

Server processes, returns SubscribeResponse

SubscriptionManager.handleSubscribeResponse()
state: "active", docs populated

SyncEngine.handleSubscribeResponse() — write to LocalDatabase

callback(data) → React re-render

Ongoing sync loop:
Server sends Sync { deltas }
→ SubscriptionManager.handleSync()
→ SyncEngine.handleSync()
→ callback(data) → React re-render

Local mutation lifecycle (deterministic replay model)

useMutation(api.todos.create)

mutate({ text: "Buy milk" })

mutateLocal()
1. Create ReplayableEnv (captures non-determinism)
2. withReplayableGlobals(env, async () => {
createLocalMutationContext(syncEngine)
Run handler with LocalDatabaseWriter
Track operations → TrackedOperation[]
} )
3. mutationLog.log(functionName, args, env.capture())
4. Queue mutation for sending
If connected:
WebSocket → SyncWrite { request_id, function, args }
Server responds → SyncWriteResult
success: mutationLog.confirm(id)
error + rollback: undo local changes
If disconnected:
Mutation stays in log + queue
On reconnect:
1. Server sends fresh state via subscriptions
2. rebasePendingMutations():
Clear local mutation data
For each LoggedMutation (in sequence order):
replayMutation(logged) → withReplayableGlobals(logged.env, handler)
Local state = server truth + replayed mutations
3. flushMutationQueue() → send all to server

Inputs and outputs

Inputs:

  • User-defined schema.ts (table definitions, sync configs, backfills)
  • User-defined function files (query/mutation handlers)
  • WebSocket messages from server (auth, subscription data, sync deltas, write results)
  • React component props (query args, mutation args)
  • Auth tokens (JWT strings or token provider functions)

Outputs:

  • Generated code (api.js, api.d.ts, schema.json, react.js — always; handlers.js, functions.json — conditional)
  • WebSocket messages to server (auth, subscribe, sync writes)
  • Local SQLite database mutations via wa-sqlite
  • React state updates (query results, loading states, connection state)
  • HTTP auth requests (sign-up, sign-in, sign-out)

On this page