Valet

Server Internals

Sync pipeline, subscriptions, and JavaScript execution.

The server is a per-project Rust process that manages a SQLite database with trigger-based change tracking, evaluates sync filters, runs JavaScript handlers, and pushes real-time deltas to connected clients over WebSocket. It lives in valet-server/src/ across 21 files.

valet-server/src/
├── main.rs — Entry point, wiring, graceful shutdown
├── ws.rs — WebSocket upgrade, client lifecycle, HTTP router
├── protocol.rs — Client/server message types, serialization
├── parse.rs — Raw JSON to ClientMessage parsing
├── pipeline.rs — Sync pipeline orchestrator
├── tailer.rs — Polls trigger-based change feed
├── hydrator.rs — Fetches full documents from row-level changes
├── matcher.rs — Evaluates filters, determines delta type
├── dispatcher.rs — Routes deltas to the right WebSocket client
├── subscription.rs — Subscription registry with multi-index lookups
├── filter.rs — JSON to FilterExpr parser
├── query.rs — Query builder for SQL generation
├── document.rs — Document CRUD trait and implementations
├── db.rs — SQLite wrapper with trigger-based change tracking
├── schema_registry.rs — Table definitions, sync configs, backfills
├── js_runtime.rs — boa_engine JavaScript execution
├── auth.rs — JWT validation and auth context
├── auth_service.rs — Sign-up/sign-in/sign-out business logic
├── auth_endpoints.rs — HTTP handlers for auth routes
├── auth_tables.rs — DDL for auth tables
└── error.rs — Shared error types

Sync pipeline

The core data flow. Runs on a 50ms poll interval via spawn_pipeline_poller.

trigger-based change feed (_valet_changes)

Tailer
polls _valet_changes since last_version
emits ChangeBatch { changes: Vec<Change>, up_to_version }
Hydrator
for each Change, SELECT full row from table
emits HydratedDocument { table, id, doc, version, deleted }
Matcher
for each doc, evaluate all subscriptions on that table
emits Vec<MatchResult> { sub_id, doc_id, delta_type, doc }
Dispatcher
find client for each sub_id, send ServerMessage::Sync

Pipeline struct

Pipeline
├── tailer: Tailer
├── hydrator: Hydrator
├── matcher: Matcher
├── dispatcher: Arc<Dispatcher>
└── registry: Arc<SubscriptionRegistry>
process_changes(db, batch)
├── For each Change in batch:
├── hydrator.hydrate(db, change) → HydratedDocument
├── matcher.match_document(registry, doc) → Vec<MatchResult>
└── dispatcher.dispatch_to_subscriptions(result)
└── Returns Ok(())

Tailer

Detects changes by polling the _valet_changes table, which is populated by per-table AFTER triggers.

Tailer
├── sender: broadcast::Sender<ChangeBatch>
└── last_version: i64
poll_and_broadcast(db)
├── SELECT * FROM _valet_changes WHERE db_version > last_version
├── Group into ChangeBatch
├── broadcast::send(batch) to all receivers
└── Update last_version
Change
├── table: String
├── row_id: String — plain _id string
├── operation: String — "insert" | "update" | "delete"
└── db_version: i64

Uses tokio::sync::broadcast so the pipeline and any future consumers each get their own receiver.


Hydrator

Turns row-level Changes into complete documents.

Hydrator (stateless)
hydrate(db, change) → HydratedDocument
├── If operation == "delete": return { deleted: true, doc: {} }
└── Otherwise: SELECT * FROM <table> WHERE _id = <row_id>
└── Return { table, id, doc, version, deleted: false }
hydrate_batch(db, changes) → Vec<HydratedDocument>
├── Deduplicates by (table, row_id) — keeps last change per row
└── Calls hydrate() for each unique change
HydratedDocument
├── table: String
├── id: String
├── doc: Map<String, Value>
├── version: i64
└── deleted: bool

Matcher

Evaluates subscription filters against hydrated documents. Determines whether a document is an insert, update, or removal for each subscription.

Matcher (stateless)
match_document(registry, doc) → Vec<MatchResult>
├── Get all subscriptions for doc.table
├── For each subscription:
├── Resolve $var references in filter using auth context
├── If server_query_function set:
└── execute_transform_handler(code, doc)
├── Otherwise: matches(doc, filter, auth_context)
├── Check membership: was doc already in this subscription?
├── Not member + matches → Insert
├── Member + still matches → Update
└── Member + no longer → Remove
└── Push MatchResult
└── Return all results
DeltaType: Insert | Update | Remove

Filter evaluation

matches(doc, filter, auth_context) → bool
FilterExpr
├── True — always matches
├── Eq { field, value } — doc[field] == value
├── Neq { field, value } — doc[field] != value
├── Lt / Lte / Gt / Gte — numeric/string comparisons
├── OneOf { field, values } — doc[field] in values
├── IsNull / IsNotNull — null checks
├── ArrayContains { field, value } — array membership
├── And { filters } — all must match
├── Or { filters } — any must match
└── Not { filter } — negation

Variable references (e.g. ctx.auth.userId) are represented as { "$var": "auth.userId" } JSON objects inside the value field of comparison variants like Eq and Neq. resolve_filter_vars walks the filter tree and replaces these $var values with actual values from the client's auth context before evaluation.


Dispatcher

Maintains a map of connected clients and delivers messages to them.

Dispatcher
└── clients: RwLock<HashMap<ClientId, Arc<dyn ClientSender>>>
ClientSender (trait)
└── send(message: ServerMessage) → Result<(), String>
register_client(id, sender) — add client
remove_client(id) — drop client
dispatch_to_subscriptions(result)
├── Look up subscription → get client_id
├── Look up client → get sender
├── Build Delta from MatchResult
└── sender.send(ServerMessage::Sync { deltas })
dispatch_to_client(id, message) — direct send

Subscription registry

Triple-indexed store for fast lookup by ID, table, or client.

SubscriptionRegistry
├── by_id: RwLock<HashMap<SubscriptionId, Subscription>>
├── by_table: RwLock<HashMap<String, Vec<SubscriptionId>>>
└── by_client: RwLock<HashMap<ClientId, Vec<SubscriptionId>>>
Subscription
├── id: SubscriptionId
├── client_id: ClientId
├── table: String
├── filter: FilterExpr
├── created_at: DateTime<Utc>
├── last_sync_version: i64
├── members: HashSet<DocumentId> — tracks which docs are "in" this sub
├── server_query_function: Option<String>
└── server_query_args: Option<Value>
register(sub) — insert into all three indexes
get(id) — lookup by ID
get_subscriptions_for_table(table)
remove(id) — remove from all three indexes
remove_client(client_id) — bulk remove all subs for a client
is_member / add_member / remove_member / add_members

Protocol

Wire format for WebSocket messages. Current version: v4.

Client → Server

ClientMessage
├── Auth { token }
├── Subscribe { sub_id, function, args, last_sync_version? }
├── Unsubscribe { sub_id }
└── SyncWrite { request_id, table, operation, doc, function?, args? }

Server → Client

ServerMessage
├── Hello { protocol_version }
├── AuthOk { user_id, expires_at? }
├── AuthError { code, message }
├── SubscribeResponse { sub_id, status, sync_type, data, version }
├── Sync { deltas: Vec<Delta> }
├── SyncWriteResult { request_id, success, result?, error?, rollback? }
└── Error { code, message }
Delta
├── sub_id, table, operation (Insert|Update|Remove|Replace)
├── doc?, _id?, version
└── docs? — for Replace (full snapshot)
AuthErrorCode: TokenInvalid | TokenExpired | TokenMissing | IssuerMismatch | AudienceMismatch | AuthRequired

WebSocket lifecycle (ws.rs)

Client connects to /ws
WebSocket upgrade
Generate client_id (UUID)
Register with Dispatcher
Send Hello { protocol_version: 4 }
Wait for Auth { token }
Valid
AuthOk { user_id, expires_at }
Store auth_context
Enter message loop
Invalid
AuthError { code, message }
Close connection
Message loop:
├── Subscribe → execute query, return SubscribeResponse, register sub
├── Unsubscribe → remove sub from registry
└── SyncWrite → execute mutation (JS handler or direct CRUD), return SyncWriteResult, pipeline picks up changes on next poll

HTTP routes

AppState
├── registry: Arc<SubscriptionRegistry>
├── dispatcher: Arc<Dispatcher>
├── db: Arc<Mutex<Database>>
├── schema_registry: Arc<SchemaRegistry>
├── function_registry: Arc<FunctionRegistry>
├── auth_service: Option<Arc<AuthService>>
└── auth_validator: AuthValidator
Routes:
/ws — WebSocket upgrade → handle_socket
/api/schema — POST schema.json → SchemaRegistry
/api/functions — POST functions.json → FunctionRegistry
/auth/signUp — POST → auth_endpoints::sign_up_handler
/auth/signIn — POST → auth_endpoints::sign_in_handler
/auth/signOut — POST → auth_endpoints::sign_out_handler

Database layer (db.rs)

Database
└── conn: rusqlite::Connection
in_memory() — open :memory: + create _valet_changes/_valet_version tables
open(db_path) — open file + create _valet_changes/_valet_version tables
db_version() — SELECT version FROM _valet_version WHERE id = 1
changes_since(version, limit) — SELECT * FROM _valet_changes WHERE db_version > ?
create_change_triggers(table) — create AFTER INSERT/UPDATE/DELETE triggers
drop_change_triggers(table) — drop triggers (for schema migration)
cleanup_changes(up_to_version) — DELETE FROM _valet_changes WHERE db_version <= ?
checkpoint() — PRAGMA wal_checkpoint(TRUNCATE)
conn() — raw connection access
ColumnInfo { name, col_type, notnull, dflt_value, pk }

Each synced table gets three triggers ({table}_valet_insert, {table}_valet_update, {table}_valet_delete) that increment the global version counter and record changes in _valet_changes.

Document operations (document.rs)

DocumentOps (trait)
├── insert(table, doc) → id — generates UUID, sets _version to 1
├── update(table, id, fields) — partial update, increments _version
├── delete(table, id)
└── get(table, id) → Document
DocumentOpsExt: DocumentOps
└── delete_with_version(table, id, expected_version)
DocumentError
├── NotFound / TableNotFound / AlreadyExists
├── InvalidId / InvalidTableName / InvalidField
├── VersionConflict { expected, actual }
└── Database(String)

Query builder (query.rs)

Query
├── table: String
├── filter: Option<FilterExpr>
├── order_by: Option<OrderBy>
├── limit: Option<usize>
└── offset: Option<usize>
Query::new(table).filter(expr).order_by("name", Asc).limit(10).execute(db)

Schema registry

Stores table configurations and backfill definitions received from codegen.

SchemaRegistry
├── tables: RwLock<HashMap<String, TableSyncConfig>>
└── backfills: RwLock<HashMap<String, Vec<BackfillDef>>>
TableSyncConfig
├── mode: String — "full" | "none"
├── filter: Option<FilterExpr> — table-level sync filter
└── fields: Vec<FieldDef>
FieldDef { name, field_type, optional }
BackfillDef { table, column, code, param_name }
register_table(name, config)
get_table_config(name)
register_backfill(def)
get_backfills(table)
has_sync_filter(table) → bool
apply_sync_filter(table, filter) — ANDs table filter with subscription filter
Table creation: CREATE TABLE + create_change_triggers()
Table migration: drop_change_triggers() → ALTER TABLE → create_change_triggers()

JavaScript runtime (js_runtime.rs)

Executes user-defined query and mutation handlers using boa_engine.

FunctionRegistry
└── functions: Arc<RwLock<HashMap<String, RegisteredFunction>>>
RegisteredFunction { name, func_type, code }
FunctionType: Query | Mutation
register(func)
get(name) → Option<RegisteredFunction>
list_by_type(func_type) → Vec<RegisteredFunction>
Execution:
Handlers are dispatched through ws.rs, which creates a boa_engine JS context for each invocation with:
├── ctx.db.insert / update / delete / get / query
├── ctx.auth (if authenticated)
└── Executes handler code, awaits promises
execute_transform_handler(code, row, db, auth_context)
└── Standalone utility for schema migration column transforms
JsExecutionError
├── EvalError / RuntimeError / ConversionError
├── DatabaseError
└── FunctionNotFound

Known limitation: ctx.db.query(table).filter(predicate).collect() — the .filter() method is currently a no-op that returns this for chaining compatibility. .collect() fetches all rows from the table. Server-side query filtering is done via FilterExpr in the subscription system, not in the JS query builder. All query operations (filter, order, limit, offset) currently execute in-memory after fetching all rows.


Authentication

JWT validation (auth.rs)

AuthConfig
├── Development — trust any token
└── HmacSecret { secret, issuer?, audience? }
AuthValidator
└── config: AuthConfig
validate(token) → ValidationResult
├── Development: decode without verification, extract sub
└── HmacSecret: verify signature, check exp/iss/aud
├── Ok { auth_context, user_id, expires_at }
├── Expired
├── Invalid(reason)
├── IssuerMismatch
└── AudienceMismatch
AuthContext { context: Value }
context shape: { "auth": { "userId": "...", "claims": {...} } }
JwtClaims { sub, exp?, iat?, iss?, aud?, extra }

Config source: VALET_AUTH_SECRET, VALET_AUTH_ISSUER, VALET_AUTH_AUDIENCE env vars. If VALET_AUTH_SECRET is unset, runs in development mode.

Auth service (auth_service.rs)

Built-in email/password auth. Only created if VALET_AUTH_SECRET is set.

AuthService
└── secret: String (for signing JWTs)
sign_up(db, email, password, name?) → AuthResult
sign_in(db, email, password) → AuthResult
sign_out(db, session_id)
get_session_user(db, session_id) → Option<AuthUser>
AuthResult { user: AuthUser, tokens: AuthTokens }
AuthUser { id, email, name?, email_verified }
AuthTokens { token, expires_at }
AuthError: EmailAlreadyExists | InvalidCredentials | SessionInvalid | PasswordTooShort | EmailRequired | InvalidEmail | Database | Hashing | JwtEncoding

Auth tables (auth_tables.rs)

Four tables, created on startup (no change triggers — not synced):

_auth_users (_id, email UNIQUE, name, email_verified, created_at, updated_at)
_auth_accounts (_id, user_id FK, provider, provider_account_id, hashed_password, created_at)
_auth_sessions (_id, user_id FK, expires_at, created_at, last_active_at)
_auth_verification_codes (_id, user_id FK, code_hash, type, expires_at, created_at, used)

Error types (error.rs)

Error
├── Database(rusqlite::Error)
├── WebSocket(String)
├── Serialization(serde_json::Error)
├── SubscriptionNotFound(String)
├── ClientNotFound(String)
├── InvalidMessage(String)
└── Internal(String)

Concurrency model

Arc<Mutex<Database>> — ws handlers + pipeline poller (rusqlite is !Sync)
RwLock — SubscriptionRegistry, FunctionRegistry, SchemaRegistry, Dispatcher
broadcast::channel — Tailer → pipeline (multi-receiver change feed)
mpsc::channel — WebSocket message sending to clients

The pipeline poller is a single tokio::spawn task that calls tailer.poll_and_broadcast(db) every 50ms. WebSocket handlers and the pipeline both acquire the database mutex, so writes from mutations are immediately visible to the next poll.


Inputs and outputs

Inputs:

  • WebSocket messages from Valet clients (auth, subscribe, sync writes)
  • HTTP POST of schema.json from codegen (table definitions, sync configs, backfills)
  • HTTP POST of functions.json from codegen (query/mutation handler code)
  • HTTP POST auth requests (sign-up, sign-in, sign-out)
  • Environment variables for auth configuration

Outputs:

  • WebSocket messages to clients (auth responses, subscription data, sync deltas, write results)
  • SQLite database mutations (trigger-tracked tables)
  • JavaScript execution results from boa_engine

On this page