Skip to content

Phase 05: EmbeddedDAGNode composition

The Archivist uses two packaged sub-DAGs, each placed via .embeddedDAG():

  • book-search-scatter: the full 4-source scout cluster (extract query, decide tools, 4 parallel scouts, rank, merge, record, gate, recall). Placed three times in the parent: on-topic-search, author-search, and similar-search.
  • compose-retry-loop: the compose, validate, retry, respond terminal. Placed once as compose-loop; every successful search branch converges on it.

Each embedded-DAG placement uses the wire field stateMapping.input to seed child fields from parent paths before the body runs and stateMapping.output to copy produced child fields back into the parent after the body completes. (The builder option object spells these inputs / outputs; the serialized JSON-LD wire form is singular.)

Loading graph…

Sub-DAG: the packaged scout cluster

ts
/**
 * BookSearchScatterDAG: reusable query-extract + 4-source parallel scout cluster.
 *
 * Internal flow:
 *
 *   extract-query
 *     └─ success ──► decide-tools
 *   decide-tools
 *     └─ (tools | no-tools) ──► recall-candidates
 *   recall-candidates
 *     └─ recalled ──► book-search-scatter (parallel, combine: collect)
 *          ├─ openlibrary-scout  (OpenLibrary)
 *          ├─ google-books-scout (Google Books)
 *          ├─ subject-scout      (Subject search)
 *          └─ wikipedia-scout    (Wikipedia enrichment)
 *     └─ rank-candidates
 *     └─ merge-candidates
 *          ├─ ranked ──► record-findings
 *          └─ empty  ──► no-results (TerminalNode(failed) → parent EmbeddedDAGNode routes parent error)
 *     └─ record-findings
 *     └─ has-citations-gate
 *          ├─ pass ──► recall-past-visits ──► END (success)
 *          └─ fail ──► no-results (TerminalNode(failed) → parent EmbeddedDAGNode routes parent error)
 *
 * Outputs:
 *   success: query extracted, candidates found, ranked, recorded, and recalled
 *   error:   no candidates after merge, or citations gate failed;
 *             signalled by the no-results TerminalNode(failed) placement so
 *             the parent EmbeddedDAGNode routes the parent placement to its
 *             'error' branch
 *
 * Molecular import pattern:
 *   import { bookSearchScatterBundle } from './embedded-dags/BookSearchScatterDAG.ts';
 *   dispatcher.registerBundle(bookSearchScatterBundle);
 *
 * The sub-DAG reads `state.query` directly (no input stateMapping; the field
 * names already align with the parent). Each parent placement supplies an
 * `outputs` stateMapping that copies the fields the sub-DAG writes:
 * `terms`, `toolPlan`, `candidates`, `shortlist`, `priorContext`,
 * `failureCause` back onto the parent state so the downstream compose,
 * group-by-year, and recall steps can read them.
 *
 * Three EmbeddedDAGNode placements in the parent `the-archivist` DAG reference
 * this one definition. One definition, three usages:
 *   on-topic-search:  general web book search
 *   author-search:    author body-of-work search
 *   similar-search:   recommend-similar search
 *
 * Reviews and describe branches are inlined in the parent because they use
 * distinct post-scout steps (rankByRating and pickBestMatch respectively).
 */

import type { ArchivistState }    from '../ArchivistState.ts';
import { decideTools }        from '../nodes/decideTools.ts';
import { extractQuery }       from '../nodes/extractQuery.ts';
import { hasCitationsGate }   from '../nodes/hasCitationsGate.ts';
import { mergeCandidates }    from '../nodes/mergeCandidates.ts';
import { rankCandidates }     from '../nodes/rankCandidates.ts';
import { recallCandidates }   from '../nodes/recallCandidates.ts';
import { recallPastVisits }   from '../nodes/recallPastVisits.ts';
import { recordFindings }     from '../nodes/recordFindings.ts';
import {
  decideToolsSalvage,
  extractQuerySalvage,
  rankCandidatesSalvage,
} from '../nodes/salvage.ts';
import {
  openLibraryScout,
  googleBooksScout,
  subjectScout,
  wikipediaScout,
} from '../nodes/scouts.ts';
import type { ArchivistServices } from '../services.ts';

import type { DispatcherBundle } from '@noocodex/dagonizer';
import { DAGBuilder } from '@noocodex/dagonizer/builder';
import type { DAG } from '@noocodex/dagonizer/entities';

/**
 * The `book-search-scatter` DAG: one packaged unit that any parent DAG
 * can reference via `.embeddedDAG('placement-name', 'book-search-scatter', routes)`.
 */
export const BookSearchScatterDAG: DAG = new DAGBuilder('book-search-scatter', '1.0')

  // ── 1. extract-query ─────────────────────────────────────────────────────
  // LLM parses the raw visitor question into structured search terms.
  // Writes state.terms for the scouts and decide-tools to consume.
  // 'retry' loops back (bounded by the state retry budget); 'salvage' routes to
  // a deterministic recovery node; never a fabricated term list on the node.
  // #region retry-salvage-wiring
  .node('extract-query', extractQuery, {
    'success': 'decide-tools',
    'retry':   'extract-query',          // flow-shape retry loop (self-edge)
    'salvage': 'extract-query-salvage',  // recovery route
  })
  .node('extract-query-salvage', extractQuerySalvage, {
    'done': 'decide-tools',              // deterministic recovery rejoins the happy path
  })
  // #endregion retry-salvage-wiring

  // ── 2. decide-tools ──────────────────────────────────────────────────────
  // LLM decides which external sources to invoke. Both outputs route into
  // recall-candidates so prior memory is loaded before scouts fire.
  // 'retry' loops back (bounded); 'salvage' routes to the minimal-plan node.
  .node('decide-tools', decideTools, {
    'tools':    'recall-candidates',
    'no-tools': 'recall-candidates',
    'retry':    'decide-tools',
    'salvage':  'decide-tools-salvage',
  })
  .node('decide-tools-salvage', decideToolsSalvage, {
    'done': 'recall-candidates',
  })

  // ── 2b. recall-candidates ────────────────────────────────────────────────
  // Pre-loads state.priorCandidates from memory: shortlisted books from prior
  // runs whose visitor query has Jaccard >= 0.35 overlap with the current
  // query. Cap 10. Always routes 'recalled', even when no prior runs match.
  .node('recall-candidates', recallCandidates, {
    'recalled': 'book-search-scatter',
  })

  // ── 3. book-search-scatter ───────────────────────────────────────────────
  // All four scouts run concurrently. combine:'collect' waits for all four
  // and merges their state mutations. Each scout writes to state.candidates.
  .parallel('book-search-scatter', ['openlibrary-scout', 'google-books-scout', 'subject-scout', 'wikipedia-scout'], 'collect', {
    'success': 'rank-candidates',
    'error':   'rank-candidates',
  })
  .node('openlibrary-scout',  openLibraryScout, { 'success': null, 'empty': null })
  .node('google-books-scout', googleBooksScout, { 'success': null, 'empty': null })
  .node('subject-scout',      subjectScout,     { 'success': null, 'empty': null })
  .node('wikipedia-scout',    wikipediaScout,   { 'success': null, 'empty': null })

  // ── 4. rank-candidates ───────────────────────────────────────────────────
  // LLM-driven relevance scoring. Routes 'ranked' on success (an empty set is
  // still a valid ranking, so merge can soft-gate on zero candidates).
  // 'retry' loops back (bounded); 'salvage' passes candidates through unranked
  // via a dedicated node rather than emitting them as if they were ranked.
  .node('rank-candidates', rankCandidates, {
    'ranked':  'merge-candidates',
    'retry':   'rank-candidates',
    'salvage': 'rank-candidates-salvage',
  })
  .node('rank-candidates-salvage', rankCandidatesSalvage, {
    'done': 'merge-candidates',
  })

  // ── 5. merge-candidates ──────────────────────────────────────────────────
  // Cross-source dedupe via CanonicalId, top-5. Routes 'empty' to
  // no-results (TerminalNode(failed)) so the parent EmbeddedDAGNode's
  // terminal outcome routes the parent placement to its 'error' branch.
  .node('merge-candidates', mergeCandidates, {
    'ranked': 'record-findings',
    'empty':  'no-results',
  })

  // ── 6. record-findings ───────────────────────────────────────────────────
  // Deterministic RDF write: same input always produces the same triples.
  .node('record-findings', recordFindings, {
    'recorded': 'has-citations-gate',
  })

  // ── 7. has-citations-gate ────────────────────────────────────────────────
  // SPARQL ASK over the per-run state graph. Symbolic fence for the LLM.
  // 'fail' routes to no-results (TerminalNode(failed)) so the parent
  // EmbeddedDAGNode routes the parent placement to 'error'.
  .node('has-citations-gate', hasCitationsGate, {
    'pass': 'recall-past-visits',
    'fail': 'no-results',
  })

  // ── 8. recall-past-visits ────────────────────────────────────────────────
  // Injects prior-session context (prior queries + shortlisted titles) into
  // state.priorContext, then routes to the canonical `found` TerminalNode
  // (completed) so the parent EmbeddedDAGNode resolves its 'success' branch.
  .node('recall-past-visits', recallPastVisits, {
    'recalled': 'found',
  })

  // ── 9. Terminal nodes ────────────────────────────────────────────────────
  // Both sub-DAG exits are canonical TerminalNode placements (no bare null
  // routes): `found` (completed) drives the parent EmbeddedDAGNode's 'success'
  // branch; `no-results` (failed) drives its 'error' branch.
  .terminal('found', 'completed')
  .terminal('no-results', 'failed')

  .build();

/**
 * Bundle of every node used by `BookSearchScatterDAG` plus the DAG itself.
 * Register with `dispatcher.registerBundle(bookSearchScatterBundle)`; nodes
 * register before the DAG so the validator resolves all node references.
 */
export const bookSearchScatterBundle: DispatcherBundle<ArchivistState, ArchivistServices> = {
  'nodes': [
    extractQuery, decideTools, recallCandidates, openLibraryScout,
    googleBooksScout, subjectScout, wikipediaScout, rankCandidates,
    mergeCandidates, recordFindings, hasCitationsGate, recallPastVisits,
    extractQuerySalvage, decideToolsSalvage, rankCandidatesSalvage,
  ],
  'dags': [BookSearchScatterDAG],
};

Parent DAG: the embedded-DAG placements

The #embedded-dag-placements region covers only the .embeddedDAG(...) calls: the three placements of book-search-scatter and the one placement of compose-retry-loop:

ts
// ── on-topic branch ──────────────────────────────────────────────────────
// EmbeddedDAGNode: book-search-scatter handles extract-query, decide-tools,
// all four scouts, rank-candidates, merge, record, gate, and recall.
// One packaged cluster; first of three placements of the same sub-DAG.
// gather.map copies the fields the sub-DAG writes back to the parent state
// so compose-loop and group-by-year can read them.
.embeddedDAG('on-topic-search', 'book-search-scatter', {
  'success': 'compose-loop',
  'error':   'compose-empty',
}, {
  'outputs': {
    'terms':         'terms',
    'toolPlan':      'toolPlan',
    'candidates':    'candidates',
    'shortlist':     'shortlist',
    'priorContext':  'priorContext',
    'failureCause':  'failureCause',
  },
})

// ── lookup-author branch ─────────────────────────────────────────────────
// EmbeddedDAGNode: same book-search-scatter cluster, second placement.
// After success, group-by-year sorts results chronologically before the
// compose loop; author surveys read better in publication-timeline order.
.embeddedDAG('author-search', 'book-search-scatter', {
  'success': 'group-by-year',
  'error':   'compose-empty',
}, {
  'outputs': {
    'terms':         'terms',
    'toolPlan':      'toolPlan',
    'candidates':    'candidates',
    'shortlist':     'shortlist',
    'priorContext':  'priorContext',
    'failureCause':  'failureCause',
  },
})
// group-by-year is author-branch-specific: sorts shortlist chronologically.
.node('group-by-year', groupByYear, {
  'ordered': 'compose-loop',
})

// ── find-reviews branch ───────────────────────────────────────────────────
// Inlined. Uses rankByRating (deterministic, rating-weighted) in place of
// rankCandidates (LLM-driven). The Google Books scout carries notes.rating /
// notes.ratingsCount; rankByRating weights those for reviews-style output.
.node('reviews-extract', extractQuery, {
  'success': 'reviews-decide-tools',
  'retry':   'reviews-extract',
  'salvage': 'reviews-extract-salvage',
})
.node('reviews-extract-salvage', extractQuerySalvage, {
  'done': 'reviews-decide-tools',
})
.node('reviews-decide-tools', decideTools, {
  'tools':    'reviews-scatter',
  'no-tools': 'reviews-scatter',
  'retry':    'reviews-decide-tools',
  'salvage':  'reviews-decide-tools-salvage',
})
.node('reviews-decide-tools-salvage', decideToolsSalvage, {
  'done': 'reviews-scatter',
})
.parallel('reviews-scatter', ['reviews-ol', 'reviews-gb', 'reviews-subject', 'reviews-wiki'], 'collect', {
  'success': 'reviews-rank',
  'error':   'reviews-rank',
})
.node('reviews-ol',      openLibraryScout, { 'success': null, 'empty': null })
.node('reviews-gb',      googleBooksScout, { 'success': null, 'empty': null })
.node('reviews-subject', subjectScout,     { 'success': null, 'empty': null })
.node('reviews-wiki',    wikipediaScout,   { 'success': null, 'empty': null })
.node('reviews-rank',    rankByRating,     { 'ranked': 'reviews-merge' })
.node('reviews-merge',   mergeCandidates,  { 'ranked': 'reviews-record', 'empty': 'compose-empty' })
.node('reviews-record',  recordFindings,   { 'recorded': 'reviews-gate' })
.node('reviews-gate',    hasCitationsGate, { 'pass': 'reviews-recall', 'fail': 'compose-empty' })
.node('reviews-recall',  recallPastVisits, { 'recalled': 'compose-loop' })

// ── describe-book branch ─────────────────────────────────────────────────
// Inlined. Uses pickBestMatch to narrow multi-hit results to the top-3
// title-similar candidates before merge. Ensures the composer receives the
// specific book the visitor named, not arbitrary top-5 hits.
.node('describe-extract',      extractQuery,     { 'success': 'describe-decide-tools', 'retry': 'describe-extract', 'salvage': 'describe-extract-salvage' })
.node('describe-extract-salvage', extractQuerySalvage, { 'done': 'describe-decide-tools' })
.node('describe-decide-tools', decideTools,      { 'tools': 'describe-scatter', 'no-tools': 'describe-scatter', 'retry': 'describe-decide-tools', 'salvage': 'describe-decide-tools-salvage' })
.node('describe-decide-tools-salvage', decideToolsSalvage, { 'done': 'describe-scatter' })
.parallel('describe-scatter', ['describe-ol', 'describe-gb', 'describe-subject', 'describe-wiki'], 'collect', {
  'success': 'describe-pick',
  'error':   'compose-empty',
})
.node('describe-ol',      openLibraryScout, { 'success': null, 'empty': null })
.node('describe-gb',      googleBooksScout, { 'success': null, 'empty': null })
.node('describe-subject', subjectScout,     { 'success': null, 'empty': null })
.node('describe-wiki',    wikipediaScout,   { 'success': null, 'empty': null })
.node('describe-pick',   pickBestMatch,    { 'picked': 'describe-merge' })
.node('describe-merge',  mergeCandidates,  { 'ranked': 'describe-record', 'empty': 'compose-empty' })
.node('describe-record', recordFindings,   { 'recorded': 'describe-gate' })
.node('describe-gate',   hasCitationsGate, { 'pass': 'describe-recall', 'fail': 'compose-empty' })
.node('describe-recall', recallPastVisits, { 'recalled': 'compose-loop' })

// ── recommend-similar branch ─────────────────────────────────────────────
// recommendSimilar seeds state.terms from prior-run shortlist memory.
// 'seeded' routes to the book-search-scatter sub-DAG; third placement of
// the same packaged cluster. 'empty' routes to the compose-empty terminal.
.node('recommend-similar', recommendSimilar, {
  'seeded': 'similar-search',
  'empty':  'compose-empty',
})

// EmbeddedDAGNode: same book-search-scatter, third and final placement.
.embeddedDAG('similar-search', 'book-search-scatter', {
  'success': 'compose-loop',
  'error':   'compose-empty',
}, {
  'outputs': {
    'terms':         'terms',
    'toolPlan':      'toolPlan',
    'candidates':    'candidates',
    'shortlist':     'shortlist',
    'priorContext':  'priorContext',
    'failureCause':  'failureCause',
  },
})

// ── compose-loop: shared compose/validate sub-DAG ──────────────────────────
// All branches that successfully find candidates converge here.
// composeResponse → validateResponse (retry loop, bounded by the retry budget on state (retriesFor('compose'))).
// One sub-DAG definition serves all four convergent branches.
// stateMapping.outputs copies the compose loop's writes back to the parent.
//
// Convergence policy: 'success' routes to the shared respond-to-visitor terminal
// at the parent level; the sub-DAG produces state.draft and exits cleanly;
// exactly ONE respond-to-visitor fires per run regardless of branch count.
// 'error' (retry budget exhausted) falls through to compose-empty so the
// visitor always receives an in-character response rather than a silent drop.
.embeddedDAG('compose-loop', 'compose-retry-loop', {
  'success': 'respond-to-visitor',
  'error':   'compose-empty',
}, {
  'outputs': {
    'draft':    'draft',
    'approved': 'approved',
    'attempts': 'attempts',
  },
})

EmbeddedDAGNode output routing: null and named terminals

An EmbeddedDAGNode placement's outputs map accepts two target forms:

  • null: the branch ends with outcome: completed. Identical to any other null route, sugar for an implicit completed terminal. Use it when the parent flow has a single clean termination path and the lifecycle outcome is always completed.
  • Named TerminalNode placement: target an explicit terminal declared via .terminal(name, outcome?). The idiomatic form when the error output should mark the parent flow as failed, or when the diagram should show the endpoint as a discrete node.
ts
// null route: both success and error end with outcome=completed
.embeddedDAG('invoke', 'child', { success: null, error: null })

// named terminals: error path marks the parent flow as failed
.embeddedDAG('invoke', 'child', { success: 'end-ok', error: 'end-fail' })
.terminal('end-ok')
.terminal('end-fail', 'failed')

See Phase 09: Terminal placements for the full pattern with runnable examples.

What it demonstrates

  • .embeddedDAG(name, dagName, routes, options). The placement references the sub-DAG by its registered name. The parent and child run in the same dispatcher; the child shares the same node registry.
  • stateMapping.input (wire) / inputs (builder option). Before the body runs, the dispatcher copies the listed parent fields into the child. The child receives the seed; the body then reads from the child.
  • stateMapping.output (wire) / outputs (builder option). After the body completes, the dispatcher copies the listed child fields back into the parent. Fields not listed stay isolated.
  • One definition, three placements. book-search-scatter is registered once and placed three times with distinct placement names. Each placement routes its 'success' and 'error' outputs differently (compose-loop, group-by-year, or compose-empty).
  • Errors bubble up. Anything the child accumulates via state.collectError reaches the parent's error accumulator automatically. The child's terminal outcome determines the 'error' output.
  • bookSearchScatterBundle and composeRetryLoopBundle. Each sub-DAG module exports a DispatcherBundle packaging its nodes plus its DAG. dispatcher.registerBundle(bundle) installs the nodes before the DAG; register both embedded-DAG bundles before the parent archivistBundle.

See this in action in the Archivist live demo.

Typed stateMapping and growing shared state

The .embeddedDAG() call accepts TChildState and TParentState generic parameters that narrow options.inputs keys and options.outputs paths to dotted paths that exist on the respective state at compile time:

ts
class ParentState extends NodeStateBase {
  userQuery = '';
  candidates: string[] = [];
}

builder.embeddedDAG<ChildState, ParentState>('search', 'book-search-scatter',
  { success: 'compose-loop', error: 'compose-empty' },
  {
    inputs:  { query: 'userQuery' },               // 'userQuery' must be a path on ParentState
    outputs: { 'candidates': 'searchResults' },    // 'searchResults' must be a path on ChildState
  },
);

A misspelled parent-state path is a compile error.

stateMapping is the right tool when the relationship between parent and child is a pure field transfer at a single boundary. When multiple embedded-DAG placements accumulate to a single growing structure (agent memory, a ranked-results list, an audit log), thread a Store through the services bag instead. The store lives outside the DAG topology; every placement reads and writes to the same instance without threading values through stateMapping at every hop. See Shared state for the decision matrix, the concurrency contract, and checkpoint integration.

Composing the same flow via DAGDeriver.embeddedDAGs

The DAGBuilder .embeddedDAG(...) path above is the deterministic authoring surface. The same EmbeddedDAGNode can be produced declaratively via the DAGDeriver embeddedDAGs annotation when the surrounding flow is agent-style:

ts
DAGDeriver.derive({
  name: 'parent',
  version: '1',
  entrypoint: 'prepare',
  contracts: [
    { name: 'prepare',       hardRequired: ['input'],         produces: ['intermediate'], outputs: ['success'] },
    { name: 'invoke-plugin', hardRequired: ['intermediate'],  produces: ['childResult'],  outputs: ['success', 'error'] },
    { name: 'finalize',      hardRequired: ['childResult'],   produces: ['final'],        outputs: ['success'] },
  ],
  annotations: {
    embeddedDAGs: {
      'invoke-plugin': {
        dag:     'plugin:transform',
        outputs: ['success', 'error'],
        stateMapping: {
          input:  { intermediate: 'intermediate' },
          output: { childResult:  'childResult' },
        },
      },
    },
  },
});
  • The contract's produces to hardRequired chain still drives topology; the embeddedDAGs annotation renders an EmbeddedDAGNode. stateMapping.input seeds the child; stateMapping.output copies child fields back.
  • Every port in embeddedDAG.outputs auto-wires to the next derived stage. terminals overrides individual ports if the error path needs a different target.
  • Body references resolve at registerDAG time; the dispatcher's existing cycle check rejects self-referential embedded-DAG bodies.
  • A runnable demonstration ships in examples/derive.ts (npm run example:derive).

See Authoring DAGs for the decision matrix between the imperative .embeddedDAG() path and the declarative embeddedDAGs annotation.

Watched over by the Order of Dagon.