Core
Pluggable execution primitives. Ship through @noocodex/dagonizer/core.
import {
ParallelCombiner,
ParallelCombiners,
FanInStrategy,
FanInStrategies,
} from '@noocodex/dagonizer/core';
import type { ParallelResult, FanInExecution } from '@noocodex/dagonizer/core';ParallelCombiner
Abstract class. Subclass and override combine; register the instance with ParallelCombiners.register.
abstract class ParallelCombiner {
abstract readonly name: string;
abstract combine(
outputs: readonly string[],
results: readonly ParallelResult[],
state: NodeStateInterface,
): string;
}
interface ParallelResult {
readonly opResult: { readonly output: string };
readonly node: { readonly name: string };
}The dispatcher resolves a combiner by name (the placement's combine field) and calls .combine(...) once every concurrent node has reported. Combiners may mutate state (e.g. via state.setMetadata(...)) to expose per-node data to downstream nodes.
Defaults
all-success— returns'success'iff every node reported'success', else'error'.any-success— returns'success'iff any node reported'success', else'error'.collect— writesRecord<nodeName, output>tostate.metadata.parallelOutputsand returns'success'.
ParallelCombiners
Static registry.
class ParallelCombiners {
static register(combiner: ParallelCombiner): void;
static resolve(name: string): ParallelCombiner; // throws DAGError on unknown name
static list(): readonly string[];
}register is last-write-wins on name. resolve throws DAGError when the combiner is not registered.
FanInStrategy
Abstract class. Subclass and override apply; register the instance with FanInStrategies.register.
abstract class FanInStrategy {
abstract readonly name: string;
abstract apply<TState extends NodeStateInterface>(
config: FanInConfig,
execution: FanInExecution<TState>,
): Promise<void>;
}The dispatcher resolves a strategy by name (the FanInConfig.strategy field) and calls .apply(...) once every fan-out item has reported. Strategies mutate execution.state in place; the custom strategy uses execution.invokeNode(name) to dispatch a registered node back through the engine.
FanInExecution
interface FanInExecution<TState extends NodeStateInterface> {
readonly state: TState;
readonly results: ReadonlyMap<string, readonly unknown[]>;
readonly dagName: string;
readonly signal: AbortSignal | null;
readonly accessor: StateAccessor;
invokeNode(nodeName: string): Promise<void>;
}Per-invocation context handed to the strategy. state is the live node state; results carries per-output buckets keyed by the output the worker reported; accessor is the dispatcher's configured StateAccessor.
Defaults
append— flatten every result bucket and append to the path atconfig.target. ThrowsDAGErroriftargetis missing.partition— for each[output, path]inconfig.partitions, append the matching bucket to that path.custom— setsstate.metadata.fanInResultstoObject.fromEntries(execution.results)and invokes the registered node atconfig.customNodeviaexecution.invokeNode.
FanInStrategies
Static registry.
class FanInStrategies {
static register(strategy: FanInStrategy): void;
static resolve(name: string): FanInStrategy; // throws DAGError on unknown name
static list(): readonly string[];
}Same semantics as ParallelCombiners.
Related guides
- DAGBuilder — placements that use
combineandfanIn.strategy - State accessors — strategies receive the dispatcher's
accessor