Skip to content

Store

@noocodex/dagonizer/store

The store module provides the shared key-value store contract and its implementations. Stores live in the services bag and survive scatter clone boundaries within a run. Checkpoint integration snapshots named stores alongside parent state for deterministic resume.

ts
import { BaseStore, MemoryStore, StoreError } from '@noocodex/dagonizer/store';
import type { Snapshottable, Store, StoreSnapshot, StoreSnapshotEntry } from '@noocodex/dagonizer/contracts';

Interface: Snapshottable

@noocodex/dagonizer/contracts

The capability checkpointing depends on: a named container that serializes itself to a StoreSnapshot and rehydrates from one. It declares only two methods.

ts
interface Snapshottable {
  snapshot(): Promise<StoreSnapshot>;
  restore(snapshot: StoreSnapshot): Promise<void>;
}
MethodReturnsDescription
snapshot()Promise<StoreSnapshot>Capture the entire state as a typed envelope.
restore(snapshot)Promise<void>Repopulate from a snapshot. Implementations validate type and version before applying entries.

Snapshottable is decoupled from the key-value surface on purpose. Checkpoint.capture(dag, result, { stores }) and Checkpoint.restoreStores(map) take Record<string, Snapshottable>, so a non-KV backing (an RDF triple store, a vector index, an append-only projection) can ride along in a checkpoint without implementing get/set/has/delete/update. Store extends Snapshottable, so every Store is also Snapshottable. The StoreSnapshot / StoreSnapshotEntry envelopes live with this capability.


Interface: Store

@noocodex/dagonizer/contracts

Shared key-value store contract, extending Snapshottable. Every method returns a Promise. There is no sync variant; always await store calls.

Values are typed per-call via the method's <T> parameter. There is no class-level value generic. A Store instance can hold heterogeneous values under different keys; type narrowing happens at the call site.

ts
interface Store extends Snapshottable {
  get<T extends JsonValue>(key: string): Promise<T | undefined>;
  set<T extends JsonValue>(key: string, value: T): Promise<void>;
  has(key: string): Promise<boolean>;
  delete(key: string): Promise<boolean>;
  update<T extends JsonValue>(key: string, fn: (current: T | undefined) => T): Promise<T>;
  // snapshot() / restore() inherited from Snapshottable.
  connect(): Promise<void>;
  disconnect(): Promise<void>;
}
MethodReturnsDescription
get(key)Promise<T | undefined>Return the value at key, or undefined when absent.
set(key, value)Promise<void>Write value at key. Last-write-wins.
has(key)Promise<boolean>Return true when the key exists.
delete(key)Promise<boolean>Remove the key. Returns true when the key existed.
update(key, fn)Promise<T>Atomic read-modify-write. fn receives the current value (or undefined) and returns the new value. Implementations are responsible for atomicity.
snapshot() / restore(snapshot)inheritedFrom Snapshottable: capture / repopulate the whole store.
connect()Promise<void>Optional lifecycle hook for stores that hold a connection.
disconnect()Promise<void>Optional lifecycle hook for stores that hold a connection.

Concurrency: update(key, fn) is atomic within a single store instance. Implementations are responsible for delivering this. See the update note on BaseStore for the requirement. set + get is not atomic.


Interface: StoreSnapshot

@noocodex/dagonizer/contracts

Versioned snapshot envelope returned by Snapshottable.snapshot() and consumed by Snapshottable.restore().

ts
interface StoreSnapshot {
  readonly version: number;
  readonly type:    string;
  readonly entries: readonly StoreSnapshotEntry[];
}
FieldDescription
versionSnapshot schema version. Plugin authors increment this when the storage shape changes incompatibly. BaseStore.restore rejects mismatches with StoreError(INCOMPATIBLE_SNAPSHOT).
typeStable identifier for the store implementation (e.g. 'memory-store'). Set via BaseStore.snapshotType.
entriesOrdered list of key-value pairs at capture time.

Interface: StoreSnapshotEntry

@noocodex/dagonizer/contracts

A single entry in a StoreSnapshot.

ts
interface StoreSnapshotEntry {
  readonly key:   string;
  readonly value: JsonValue;
}

Keys in the snapshot carry the namespace prefix when a namespace is configured. Restore feeds entries directly back through performRestoreEntries; no prefix stripping is applied. Restore into a store with the same namespace used at capture time.


Class: BaseStore

@noocodex/dagonizer/store

Abstract base class every concrete store extends. Owns the snapshot envelope, the update default, optional namespace prefix, and lifecycle no-ops. Concrete stores implement the protected abstract hooks listed below.

ts
import { BaseStore, type BaseStoreOptions } from '@noocodex/dagonizer/store';

abstract class BaseStore implements Store {
  protected constructor(options?: BaseStoreOptions);
}

BaseStoreOptions

ts
interface BaseStoreOptions {
  readonly namespace?: string;
}

namespace is an optional key prefix. When set, every key passed to public methods is prefixed with ${namespace}:${key} before reaching the perform* hooks. Two stores with different namespaces can share the same physical backing without collisions.

Public methods

All public methods delegate to the perform* hooks after qualifying the key.

MethodDescription
get(key)Delegates to performGet(qualifiedKey).
set(key, value)Delegates to performSet(qualifiedKey, value).
has(key)Delegates to performHas(qualifiedKey).
delete(key)Delegates to performDelete(qualifiedKey).
update(key, fn)Default: performGetfn(current)performSet. Two await points, not atomic on its own. Subclasses must override when backing supports a single-step RMW (in-memory direct access, SQL transactions, Redis WATCH/MULTI, etc.). The default is a fallback that is only safe when no concurrent calls touch the same key.
snapshot()Calls performSnapshotEntries(), then wraps in { version: snapshotVersion, type: snapshotType, entries }.
restore(snapshot)Validates snapshot.type and snapshot.version; throws StoreError(INCOMPATIBLE_SNAPSHOT) on mismatch. On match, calls performRestoreEntries(entries).
connect()No-op default. Override for connection lifecycle.
disconnect()No-op default. Override for connection lifecycle.

Protected abstract hooks

Plugin authors implement these six methods and two accessors. All keyed arguments receive the qualified key (namespace prefix already applied).

HookSignatureDescription
snapshotTypeget snapshotType(): stringStable identifier written into every snapshot envelope.
snapshotVersionget snapshotVersion(): numberSchema version. Increment on incompatible shape change.
performGet(qualifiedKey: string) → Promise<T | undefined>Read a single value.
performSet(qualifiedKey: string, value: T) → Promise<void>Write a single value.
performHas(qualifiedKey: string) → Promise<boolean>Check existence.
performDelete(qualifiedKey: string) → Promise<boolean>Remove key; return true when it existed.
performSnapshotEntries() → Promise<readonly StoreSnapshotEntry[]>Return all entries for the snapshot.
performRestoreEntries(entries: readonly StoreSnapshotEntry[]) → Promise<void>Repopulate from entries (clear first, then apply).

Protected utility

MemberDescription
qualifyKey(key)Apply the namespace prefix. Call this in update overrides that bypass the default RMW path.

Class: MemoryStore

@noocodex/dagonizer/store

Reference implementation of BaseStore backed by a Map.

ts
import { MemoryStore } from '@noocodex/dagonizer/store';

const store = new MemoryStore();
await store.set<string>('greeting', 'hello');
const v = await store.get<string>('greeting'); // 'hello'

Constructor

ts
new MemoryStore(options?: BaseStoreOptions)

Accepts the same BaseStoreOptions as BaseStore (namespace prefix).

Snapshot type and version

FieldValue
snapshotType'memory-store'
snapshotVersion1

Atomic update

MemoryStore overrides update to access #data directly without any intermediate await. Because the body contains no yield point, no concurrent microtask can interleave between the read and the write; the read-modify-write is atomic within the store instance.

ts
// Concurrent updates produce no lost writes.
await Promise.all([
  store.update<number>('counter', (n) => (n ?? 0) + 1),
  store.update<number>('counter', (n) => (n ?? 0) + 1),
]);
const v = await store.get<number>('counter'); // → 2

Class: StoreError

@noocodex/dagonizer/store

Error class for store operations. Carries a structured classification object so callers discriminate by reason without instanceof chains.

ts
import { StoreError } from '@noocodex/dagonizer/store';

try {
  await store.restore(incompatibleSnapshot);
} catch (err) {
  if (err instanceof StoreError && err.classification.reason === 'INCOMPATIBLE_SNAPSHOT') {
    // err.classification.expectedType, .actualType, .expectedVersion, .actualVersion
  }
}

StoreErrorClassification

ts
type StoreErrorClassification =
  | {
      readonly reason:           'INCOMPATIBLE_SNAPSHOT';
      readonly expectedType:     string;
      readonly actualType:       string;
      readonly expectedVersion:  number;
      readonly actualVersion:    number;
    }
  | {
      readonly reason: 'KEY_NOT_FOUND';
      readonly key:    string;
    }
  | {
      readonly reason: 'BACKING_ERROR';
      readonly cause:  Error;
    }
  | {
      readonly reason:  'LEASE_DENIED';
      readonly subject: string;
      readonly holder:  string;
    }
  | {
      readonly reason:  'LEASE_EXPIRED';
      readonly subject: string;
      readonly token:   string;
    }
  | {
      readonly reason:    'UNREACHABLE';
      readonly endpoint:  string;
      readonly cause:     Error;
    };
ReasonWhenExtra fields
INCOMPATIBLE_SNAPSHOTrestore() called with wrong type or versionexpectedType, actualType, expectedVersion, actualVersion
KEY_NOT_FOUNDPlugin author throws when a required key is absentkey
BACKING_ERRORPlugin author wraps a backing-level failurecause
LEASE_DENIEDacquireLease finds an active holder and maxWaitMs expires before it releasessubject, holder
LEASE_EXPIREDA write or release is attempted with a token that has already expiredsubject, token
UNREACHABLETransport failure (endpoint does not respond within the health budget)endpoint, cause

BaseStore throws INCOMPATIBLE_SNAPSHOT automatically on type/version mismatch. KEY_NOT_FOUND and BACKING_ERROR are available for plugin authors to classify errors from their backing stores. LEASE_DENIED, LEASE_EXPIRED, and UNREACHABLE are for RemoteStore implementations.


Interface: RemoteStore

@noocodex/dagonizer/contracts

Extension of Store for distributed or network-backed implementations. Plugins that talk over HTTP, gRPC, or WebSocket, or that replicate state across processes, implement RemoteStore rather than Store directly. Single-process and single-node-durable stores implement Store directly.

ts
import type { RemoteStore, RemoteStoreEndpoint, RemoteStoreLease } from '@noocodex/dagonizer/contracts';
ts
interface RemoteStore extends Store {
  readonly endpoint: RemoteStoreEndpoint;
  acquireLease(subject: string, ttlMs: number, maxWaitMs: number): Promise<RemoteStoreLease>;
  releaseLease(lease: RemoteStoreLease): Promise<void>;
  health(timeoutMs: number): Promise<boolean>;
}

The engine consumes a RemoteStore through the Store surface. The extra methods are observability and coordination primitives the dispatcher uses when distributed execution is wired in.

Interface: RemoteStoreEndpoint

ts
interface RemoteStoreEndpoint {
  readonly url:    string;
  readonly region: string;
}
FieldDescription
urlStable identifier for the remote endpoint (URL, gRPC target, etc.).
regionRegion/zone hint for placement decisions. Default at construction: '' (no region constraint).

region is required. Implementations that have no region concept supply ''.

Interface: RemoteStoreLease

ts
interface RemoteStoreLease {
  readonly token:     string;
  readonly expiresAt: number;
  readonly subject:   string;
}

Opaque lease token returned by acquireLease. Consumers treat token as opaque; the store validates it on releaseLease and on writes when leasing is enforced.

FieldDescription
tokenOpaque string the store recognises on releaseLease and write checks.
expiresAtMonotonic ms timestamp the lease expires at (exclusive).
subjectScope of the lease (e.g. a key namespace or DAG run id).

Methods

MethodReturnsDescription
endpointRemoteStoreEndpointEndpoint descriptor; surfaces in observability and placement decisions.
acquireLease(subject, ttlMs, maxWaitMs)Promise<RemoteStoreLease>Acquire exclusive write authority for subject with a lifetime of ttlMs ms. Waits up to maxWaitMs for an active holder to release before throwing StoreError(LEASE_DENIED).
releaseLease(lease)Promise<void>Release a previously-acquired lease. Idempotent: releasing an already-expired lease is a no-op.
health(timeoutMs)Promise<boolean>Health probe. Returns true when the endpoint is reachable and the backing responds within timeoutMs. Implementations must not throw on transport failure: return false so the dispatcher can route around an unhealthy store.

Implementing RemoteStore

Extend BaseStore and implement the three additional methods plus the endpoint property:

ts
<<< @/../examples/dags/store-remote.ts#remote-store

Class: TypedStore<Schema>

@noocodex/dagonizer/store

Schema-narrowed wrapper over any Store. Constrains keys to the declared Schema and infers the value type from Schema[K]. Callers never specify <T> at the call site.

TypedStore does not implement the Store contract (its set signature is narrower). Use .inner to access the underlying Store when you need the wider, heterogeneous contract.

ts
<<< @/../examples/the-archivist/memory/TypedRunStore.ts#typed-store

Constructor

ts
new TypedStore<Schema extends Record<string, JsonValue>>(inner: Store)

Schema must be a Record<string, JsonValue>: every value type must be JSON-serializable.

Methods

MethodReturnsDescription
get(key)Promise<Schema[K] | undefined>Return the value at key, type inferred from Schema[K].
set(key, value)Promise<void>Write value at key. value must be Schema[K].
has(key)Promise<boolean>Return true when the key exists.
delete(key)Promise<boolean>Remove the key. Returns true when the key existed.
update(key, fn)Promise<Schema[K]>Atomic read-modify-write. fn receives Schema[K] | undefined, returns Schema[K].
snapshot()Promise<StoreSnapshot>Pass-through to the underlying Store.
restore(snapshot)Promise<void>Pass-through to the underlying Store.
connect()Promise<void>Pass-through to the underlying Store.
disconnect()Promise<void>Pass-through to the underlying Store.
.innerStoreThe underlying Store instance for un-narrowed operations.

All key parameters are constrained to keyof Schema & string. TypeScript rejects keys absent from the schema and values of the wrong type at compile time.


Watched over by the Order of Dagon.