DAGBuilder
DAGBuilder is a chainable authoring API for deterministic workflows you control end-to-end — ETL pipelines, transformation chains, fixed sequences where the order IS the spec. TypeScript narrows the routes map at each .node() call from the node's TOutput union, so misspelled routes are compile errors before the DAG runs.
If your flow is agent-style — operations declare data dependencies and you want the topology to fall out automatically — use DAGDeriver instead. See Authoring DAGs for the decision matrix. Both surfaces produce the same canonical DAG JSON-LD object; pick the one that matches the mental model you use to describe the flow.
Basic usage
The flow this snippet builds:
import { DAGBuilder, Dagonizer, NodeStateBase } from '@noocodex/dagonizer';
import type { NodeInterface } from '@noocodex/dagonizer';
class S extends NodeStateBase { value = 0; }
const validate: NodeInterface<S, 'valid' | 'invalid'> = {
name: 'validate',
outputs: ['valid', 'invalid'],
async execute(state) {
return { output: state.value > 0 ? 'valid' : 'invalid' };
},
};
const process: NodeInterface<S, 'success'> = {
name: 'process',
outputs: ['success'],
async execute(state) {
state.value *= 2;
return { output: 'success' };
},
};
const dag = new DAGBuilder('pipeline', '1.0')
.node('validate', validate, { valid: 'process', invalid: null })
.node('process', process, { success: null })
.build();
const dispatcher = new Dagonizer<S>();
dispatcher.registerNode(validate);
dispatcher.registerNode(process);
dispatcher.registerDAG(dag);The first .node() call sets the entrypoint automatically. Call .entrypoint('name') to override.
Type-safe output routing
When the node declares a narrow TOutput union, .node() enforces exhaustive routing at compile time:
// NodeInterface<S, 'ok' | 'warn' | 'error'>
.node('check', checkNode, {
ok: 'save',
warn: 'log',
// error: ??? ← TypeScript error: property 'error' is missing
})⦿ Contract-aware authoring
When the underlying NodeInterface carries a contract field (hardRequired + produces), build() runs the same dangling-read / dead-write validation that DAGDeriver runs at derive time — drift fails at build time, not run time.
- Dangling read — a non-entrypoint node declares
hardRequired: ['foo']but no upstream node produces'foo'. ThrowsDAGError. - Dead write — a node declares
produces: ['bar']but no downstream nodehardRequires'bar'. Fires theonContractWarningcallback (non-fatal).
import { DAGBuilder, DAGError } from '@noocodex/dagonizer';
import type { NodeInterface } from '@noocodex/dagonizer';
import type { NodeStateBase } from '@noocodex/dagonizer';
const fetchNode: NodeInterface<NodeStateBase, 'success'> = {
name: 'fetch',
outputs: ['success'],
contract: { hardRequired: ['url'], produces: ['raw'] },
async execute(state) { return { output: 'success' }; },
};
const parseNode: NodeInterface<NodeStateBase, 'success'> = {
name: 'parse',
outputs: ['success'],
// Deliberate mismatch: hardRequires 'data' but upstream only produces 'raw'
contract: { hardRequired: ['data'], produces: ['record'] },
async execute(state) { return { output: 'success' }; },
};
// Throws DAGError: node 'parse' hardRequires 'data' but no upstream node produces it.
new DAGBuilder('pipeline', '1.0')
.node('fetch', fetchNode, { success: 'parse' })
.node('parse', parseNode, { success: null })
.build();Pass an onContractWarning callback to capture dead writes:
const dag = new DAGBuilder('pipeline', '1.0')
.node('fetch', fetchNode, { success: 'parse' })
.node('parse', parseNode, { success: null })
.build((message) => {
console.warn('[contract]', message);
});Placements added via .parallel() or .deepDAG() do not receive a NodeInterface and are not tracked in the impl registry — they are silently skipped during contract validation, preventing false-positive dangling-read errors for node names declared elsewhere.
The onContractWarning hook on build() fires at construction time and is local to the builder call. When you register the resulting DAG with a Dagonizer subclass, the dispatcher's onContractWarning hook fires again at registerDAG time if the nodes carry co-located contracts. See Contract-derived flows and Reference: contracts.
⦿ Bypassing the fluent API — DAGBuilder.fromNodes()
For the common case where your flow is linear and every node carries a contract, you can skip the fluent chain entirely:
import { DAGBuilder } from '@noocodex/dagonizer';
const dag = DAGBuilder.fromNodes({
name: 'pipeline',
version: '1.0',
entrypoint: 'fetch',
nodes: [fetchNode, parseNode, saveNode],
});This is exactly equivalent to the fluent chain below — both produce the same canonical DAG document:
// Equivalent fluent form
const dag = new DAGBuilder('pipeline', '1.0')
.node('fetch', fetchNode, { success: 'parse' })
.node('parse', parseNode, { success: 'save' })
.node('save', saveNode, { success: null })
.build();DAGBuilder.fromNodes() delegates to DAGDeriver.derive({ nodes }) — the same deriver that powers contract-first topology. Use it when the shape is linear and all nodes carry contracts. Drop into the fluent .node() API when you need:
- Fan-out / fan-in placements
- Terminal routes to
nullmid-flow - Deep-DAG (sub-DAG) compositions
- Explicit entrypoint overrides
- Non-contract nodes that still appear in the placement list
⦿ Parallel group
const dag = new DAGBuilder('enrich', '1')
.node('fetch-a', fetchA, { success: null, error: null })
.node('fetch-b', fetchB, { success: null, error: null })
.parallel('enrich-both', ['fetch-a', 'fetch-b'], 'all-success', {
success: 'save',
error: null,
})
.node('save', saveNode, { success: null })
.entrypoint('enrich-both')
.build();Note: nodes listed in parallel() must already be declared. The builder does not validate this — registerDAG does.
Fan-out
import type { FanInConfig } from '@noocodex/dagonizer';
const fanIn: FanInConfig = {
strategy: 'partition',
partitions: { success: 'processed', error: 'failed' },
};
const dag = new DAGBuilder('batch', '1')
.fanOut('process-items', processNode, 'items', fanIn, {
'all-success': null,
'partial': null,
'all-error': null,
'empty': null,
}, { concurrency: 4 })
.build();Sub-DAG
const dag = new DAGBuilder('parent', '1')
.deepDAG('run-child', 'child-dag-name', { success: 'finalize', error: 'finalize' }, {
stateMapping: {
input: { childKey: 'parent.value' },
output: { 'parent.result': 'childResult' },
},
})
.node('finalize', finalizeNode, { success: null })
.build();.entrypoint()
By default the first added node is the entrypoint. Override explicitly:
new DAGBuilder('dag', '1')
.node('setup', setupNode, { success: 'main' })
.node('main', mainNode, { success: null })
.entrypoint('main') // skip setup during a resume, for example
.build();.build()
build() materializes the accumulated nodes and returns a DAG. It throws an Error if no entrypoint has been set (no nodes added and .entrypoint() not called).
The returned object is identical to one written by hand — pass it directly to dispatcher.registerDAG().