Phase 10: Shared state
A MemoryStore lives on the services bag, threaded through every node and every scatter clone. Parent and child append entries to the same store without passing values through inputs or gather. Checkpoint.capture snapshots the store alongside the parent state; Checkpoint.load + restoreStores restores it on resume.
Code
Services bag with a Store
The services bag declares one field of type Store. The dispatcher binds a concrete MemoryStore instance at construction time; every node sees the same instance via context.services.log:
export interface Services {
log: Store;
}Child DAG
The child DAG runs a single child-step placement; it never references the store directly. The store lives outside the topology:
export const childDag = new DAGBuilder('sub-flow', '1')
.node('child-step', childStep, { "done": 'child-end' })
.terminal('child-end')
.build();Parent DAG with embedded-DAG placement
run-child is the embedded-DAG placement. Parent and child both call context.services.log.update(...) against the same backing store, so step-a, child-step, and step-b accumulate to one entry list in execution order:
// run-child is an EmbeddedDAGNode whose body is the registered 'sub-flow' DAG.
// The child shares the same services bag, so child-step appends to the same
// MemoryStore between step-a and step-b.
export const parentDag = new DAGBuilder('main-flow', '1')
.node('step-a', stepA, { "done": 'run-child' })
.embeddedDAG('run-child', 'sub-flow', { "success": 'step-b', "error": 'step-b' })
.node('step-b', stepB, { "done": 'end' })
.terminal('end')
.build();Store initialisation + run
The dispatcher takes the store on services. After execution, the same instance carries the writes from every node:
const logStore = new MemoryStore();
const dispatcher = new Dagonizer<NodeStateBase, Services>({ "services": { "log": logStore } });Full round-trip (normal run, then checkpoint + resume)
The runnable example covers the full lifecycle: a normal run, then a second run that aborts after step-a, captures the partial state plus the store, restores the store into a fresh MemoryStore, and resumes:
{
const logStore = new MemoryStore();
const dispatcher = new Dagonizer<NodeStateBase, Services>({ "services": { "log": logStore } });
dispatcher.registerNode(stepA);
dispatcher.registerNode(stepB);
dispatcher.registerNode(childStep);
dispatcher.registerDAG(childDag);
dispatcher.registerDAG(parentDag);
await dispatcher.execute('main-flow', new NodeStateBase());
const entries = await logStore.get('entries') ?? '';
process.stdout.write('\nPart 1: Normal run:\n');
process.stdout.write(` log.entries = "${entries}"\n`);
// → "step-a,child-step,step-b"
}
// Part 2: Checkpoint round-trip
// Abort after step-a, capture checkpoint with store, restore, resume.
{
const logStore = new MemoryStore();
const dispatcher = new Dagonizer<NodeStateBase, Services>({ "services": { "log": logStore } });
dispatcher.registerNode(stepA);
dispatcher.registerNode(stepB);
dispatcher.registerNode(childStep);
dispatcher.registerDAG(childDag);
dispatcher.registerDAG(parentDag);
// Abort mid-run: abort after step-a to produce a checkpoint-worthy cursor.
const ctl = new AbortController();
const execution = dispatcher.execute('main-flow', new NodeStateBase(), { "signal": ctl.signal });
let seen = 0;
for await (const _event of execution) {
seen++;
if (seen === 1) ctl.abort(new Error('checkpoint'));
}
const partial = await execution;
if (partial.cursor === null) {
process.stdout.write('\nPart 2: run completed before abort; no cursor\n');
} else {
// Capture checkpoint: snapshot the store alongside the parent state.
const ckpt = await Checkpoint.capture('main-flow', partial, { "stores": { "log": logStore } });
const json = ckpt.toJson();
process.stdout.write('\nPart 2: Checkpoint captured:\n');
process.stdout.write(` cursor = "${partial.cursor}"\n`);
process.stdout.write(` log at capture = "${await logStore.get('entries') ?? ''}"\n`);
// Resume: restore store from checkpoint, then resume execution.
const freshLog = new MemoryStore();
const ckpt2 = Checkpoint.load(JSON.parse(json) as unknown);
await ckpt2.restoreStores({ "log": freshLog });
const restoredEntries = await freshLog.get('entries') ?? '';
process.stdout.write(` log after restoreStores = "${restoredEntries}"\n`);
const resumeDispatcher = new Dagonizer<NodeStateBase, Services>({ "services": { "log": freshLog } });
resumeDispatcher.registerNode(stepA);
resumeDispatcher.registerNode(stepB);
resumeDispatcher.registerNode(childStep);
resumeDispatcher.registerDAG(childDag);
resumeDispatcher.registerDAG(parentDag);
const { dagName, state, cursor } = ckpt2.restoreState(
(snap) => NodeStateBase.restore(snap),
);
await resumeDispatcher.resume(dagName, state, cursor);
const finalEntries = await freshLog.get('entries') ?? '';
process.stdout.write(` log after resume = "${finalEntries}"\n`);
// → "step-a,child-step,step-b" (all three present, none duplicated)
}
}What it demonstrates
Storeon the services bag.Dagonizer<TState, TServices>is generic over the services shape. TheMemoryStoreinstance is the same reference every node receives viacontext.services.log. See theservicesregion.- Single store, many writers.
step-a,child-step,step-ball calllog.update('entries', ...)against one instance. Order of the resulting entries reflects execution order, not topology. - Scatter clones inherit the services bag.
child-stepsees the samelogas the parent without anyinputsorgatherfor the store.inputs/gatherare for parent/clone state transfer; stores are orthogonal. Checkpoint.capture({ stores }). Capturing a checkpoint with thestoresoption snapshots each named store alongside the state. The capture is keyed by the same name the services bag uses (log).Checkpoint.load(...).restoreStores({ log: freshLog }). Restores the store contents into a fresh instance. The resumed dispatcher uses the fresh instance on its services bag, so the resume continues from the captured store contents.- Resume is order-preserving. After restoreStores plus resume, the final
entriesvalue isstep-a,child-step,step-bwith no duplication, identical to the normal-run output.
See Shared state for the decision matrix between inputs/gather (point-to-point transfer) and Store (accumulating shared structure), and the concurrency contract for write-write races in parallel placements.