The Cartographer
The Cartographer is a deterministic data-orchestration pipeline that ingests multi-format satellite tracking feeds, routes each event only through the nodes it needs, and aggregates continent-level insights with GDPR-compliant PII handling — all without an LLM, without a GPU, running entirely in your browser.
It runs on the same @noocodex/dagonizer engine as The Archivist. Only the node domain differs: agent reasoning vs data enrichment. The DAG topology, lifecycle hooks, observer pattern, streaming scatter, and embedded-DAG composition are identical.
Try it live below. Click Run to stream 16 synthetic tracking events through the full pipeline. Watch the DAG pane: nodes light cyan while executing, edges flash when traversed, and branching skips are visible as edges that never fire.
A deterministic multi-source ETL pipeline: CSV facility scans, JSON position pings, and gzip NDJSON sensor readings fan into one canonical model, then every event routes through only the nodes it needs — geo-resolution skipped when the source pre-resolved location, GDPR redaction skipped when PII is absent or not required. The branching is visible live in the DAG pane.
Run the pipeline to see continent insights and journey data.
Run the pipeline to see the before / after transformations.
Watch the Panels tab after the run: the before/after panel shows raw GPS coordinates resolved to a real continent/country, and raw PII fields redacted to their pseudonymised forms. The routing savings table shows how many node executions the conditional branching avoided.
The thesis
Data orchestration = the same engine. Agentic LLM workflows and deterministic ETL pipelines are both DAGs of typed nodes with state. The engine does not know or care whether a node calls an LLM, decodes CSV, or runs a haversine formula.
The Cartographer makes the value of the DAG concrete: deterministic conditional routing skips unnecessary work. A position-ping that already carries resolved geo never touches the geo-resolution sub-DAG. An event with no PII never touches the GDPR redaction sub-DAG. The savings are visible in the routing table.
Architecture
Four DAGs, two scatters, and three embedded sub-DAGs:
cartographer (top-level)
phase('seed') ← pre-phase: build multi-format source feeds
scatter('ingest-sources', 'sources') ← FAN-IN: one run of ingest-source per feed
└─ ingest-source ← per-source: decompress → parse → map → validate
merge-events ← flatten per-source buckets → canonicalEvents
scatter('process-events', 'canonicalEvents', concurrency=16) ← STREAMING
└─ event-pipeline ← per-event: BRANCHING enrichment
├─ route-geo (skip or run geo-resolve sub-DAG)
│ └─ geo-resolve ← reverse-geocode ∥ ip-geolocate → fuse-geo
├─ normalize → classify → route-kind (geo-only | sensor | order | customs)
├─ route-redaction (skip or run gdpr-compliance sub-DAG)
│ └─ gdpr-compliance ← consent-gate → classify-pii → redact-pii
└─ aggregate-event
summarize → doneThe top-level cartographer DAG uses two streaming scatters:
Ingestion fan-in (
ingest-sources): four source feeds (CSV, JSON, gzip NDJSON, JSON customs) each run their owningest-sourcesub-DAG in an isolated clone. Theappendgather concatenates each clone's decodedingestedEventsinto oneingestBucketsarray;merge-eventsflattens it into the unifiedcanonicalEventscollection. Shared transform nodes (decompress,parse-csv,parse-json,parse-ndjson,map-fields,coerce-types,validate-event) are reused across every source — the format only changes which subset runs.Streaming enrichment (
process-events): processes the merged canonical events at concurrency 16. Each event clone runs the fullevent-pipelinebranching DAG and produces one compactEnrichedShipment. Theappendgather collects all enriched records intostate.records.
Branching conditional routing
The event-pipeline DAG routes each event only through the nodes it needs. Two skip conditions are the headline:
route-geo: a position-ping that already carries resolved geo (country, continent, region from the JSON source) routes toapply-geoand never enters thegeo-resolvesub-DAG. Both real API calls (reverse-geocode + IP geolocation) are avoided.route-redaction: an event with no PII fields, or one whose consent/jurisdiction does not require processing, routes toskip-redactionand never enters thegdpr-compliancesub-DAG.
Each routing node records its decision on the clone's state.routing object (a EnrichedShipment.routing value). The parent's summarize node folds these across all records to produce the savings tally.
export const routeGeo: NodeInterface<CartographerState, 'has-geo' | 'needs-geo', CartographerServices> = {
'name': 'route-geo',
'outputs': ['has-geo', 'needs-geo'],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
const geo = state.canonical.geo;
// A source's pre-resolved geo only lets us skip the lookup when it actually
// resolved a location — an 'UNK'/'Unmapped' placeholder (e.g. a ping whose
// coords were out of range at the source) is NOT resolved, so it must run
// the lookup path where validate-coords can reject the bad coords.
const hasResolvedGeo =
geo !== undefined &&
geo.country.length > 0 &&
geo.country !== 'UNK' &&
geo.region.length > 0 &&
geo.region !== 'Unmapped';
if (hasResolvedGeo) {
state.routing = { ...state.routing, 'geoLookupSkipped': true, 'geoLookupRun': false };
return { 'output': 'has-geo' };
}
state.routing = { ...state.routing, 'geoLookupRun': true, 'geoLookupSkipped': false };
return { 'output': 'needs-geo' };
},
};export const routeRedaction: NodeInterface<CartographerState, 'needs-redaction' | 'skip-redaction', CartographerServices> = {
'name': 'route-redaction',
'outputs': ['needs-redaction', 'skip-redaction'],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
const ev = state.currentEvent;
const hasPii =
state.canonical.pii === true ||
ev.recipientName.length > 0 ||
ev.recipientEmail.length > 0;
const alreadyHandled = state.canonical.consentHandled === true;
const consentStatus = Consent.statusFor(ev.shipmentId, ev.marketingConsent);
const juris = state.geoContext.jurisdiction;
const lightRegime = juris === 'baseline' || juris === 'international-waters';
// Light regime + valid consent imposes no redaction obligation.
const notRequired = lightRegime && consentStatus === 'valid';
const skip = !hasPii || alreadyHandled || notRequired;
if (skip) {
state.routing = { ...state.routing, 'redactionSkipped': true, 'redactionRun': false };
// Set a minimal no-op GdprResult: redaction NOT applied, precise coords
// retained. Marketing analytics eligibility still tracks valid consent.
state.gdprResult = {
...state.gdprResult,
'consentStatus': consentStatus,
'lawfulBasis': state.raw.lawfulBasis,
'jurisdiction': state.geoContext.jurisdiction,
'redactionApplied': false,
'coordsCoarsened': false,
'marketingAnalyticsEligible': consentStatus === 'valid',
};
return { 'output': 'skip-redaction' };
}
state.routing = { ...state.routing, 'redactionRun': true, 'redactionSkipped': false };
return { 'output': 'needs-redaction' };
},
};The DAGs
Top-level: cartographer
export const cartographerDAG: DAG = new DAGBuilder('cartographer', '1.0')
// Pre-phase: seeds state.sources = Sources.build(state.eventCount) — the
// multi-format source feeds — before the ingestion scatter reads them.
.phase('seed', 'pre', seedEvents)
// Ingestion FAN-IN: scatter over the source feeds; each runs its ingest-source
// sub-DAG in an isolated clone; the append gather appends each clone's
// state.ingestedEvents array as one bucket of state.ingestBuckets.
.scatter(
'ingest-sources',
'sources',
{ 'dag': 'ingest-source' },
{
'all-success': 'merge-events',
'partial': 'merge-events',
'all-error': 'merge-events',
'empty': 'merge-events',
},
{
'itemKey': 'source',
'concurrency': 4,
'gather': {
'strategy': 'append',
'field': 'ingestedEvents',
'target': 'ingestBuckets',
},
},
)
// merge-events: flatten the per-source buckets into one canonicalEvents model.
.node('merge-events', mergeEvents, {
'merged': 'process-events',
})
// Streaming enrichment: scatter over the merged canonical events at
// concurrency 16. Each clone's state.enriched is appended into state.records.
.scatter(
'process-events',
'canonicalEvents',
{ 'dag': 'event-pipeline' },
{
'all-success': 'summarize',
'partial': 'summarize',
'all-error': 'summarize',
'empty': 'summarize',
},
{
'itemKey': 'canonical-event',
'concurrency': 16,
'gather': {
'strategy': 'append',
'field': 'enriched',
'target': 'records',
},
},
)
// Fold gathered records into the fixed-size regional + per-journey insights.
.node('summarize', summarizeInsights, {
'success': 'done',
})
.terminal('done', 'completed')
.build();Branching enrichment: event-pipeline
/**
* event-pipeline (BRANCHING — the headline): each event routes ONLY through the
* nodes it needs. Four embedded sub-DAGs compose the domain logic; the parent is
* a thin orchestrator. Three conditional branches + per-kind enrichment lanes,
* all converging on aggregate-event:
*
* parse ─invalid→ rejected
* └parsed→ route-geo
* ├has-geo→ apply-geo ──────────────┐ (SKIP geo-resolve lookup)
* └needs-geo→ validate-coords │
* ├rejected→ rejected │
* └valid→ [geo-resolve]──┘ (embedded: reverse-geocode+ip+fuse)
* ▼ (converge)
* [canonicalize] (embedded: normalize → classify)
* ▼
* route-kind
* ┌──────────────────────────────────────────────────────────────────────┘
* ├geo-only (position-ping)→ enrich-leg
* ├sensor (sensor-reading)→ cold-chain-check → enrich-leg
* ├order (facility-scan / delivery-confirmation)→
* │ [order-enrichment] (embedded: pricing→shipping→eta) → enrich-leg
* └customs (customs-event)→ customs-dwell → enrich-leg
* ▼ (converge)
* route-redaction
* ├needs-redaction→ [gdpr] → aggregate-event
* └skip-redaction→ aggregate-event (direct bypass)
* aggregate-event ─done→ done
*
* Each routing decision is recorded on the clone's state.routing (RAN vs
* SKIPPED) and copied onto the enriched record so summarize totals the savings.
*/
export const eventPipelineDAG: DAG = new DAGBuilder('event-pipeline', '1.0')
// 1. parse: adapt the canonical event (from metadata) into state.raw.
.node('parse', parseEvent, {
'parsed': 'route-geo',
'invalid': 'rejected',
})
// 2. route-geo: SKIP the geo lookup when the source pre-resolved location.
.node('route-geo', routeGeo, {
'has-geo': 'apply-geo',
'needs-geo': 'validate-coords',
})
// 2a. apply-geo (skip path): materialise GeoContext from carried geo.
.node('apply-geo', applyGeo, {
'normalize': 'canonicalize',
})
// 2b. validate-coords (lookup path): WGS-84 bounds check on the scan coords.
.node('validate-coords', validateCoords, {
'valid': 'geo-resolve',
'rejected': 'rejected',
})
// 2c. geo-resolve: embedded multi-modal geo-resolution sub-DAG (REAL APIs):
// reverse-geocode ∥ ip-geolocate → fuse-geo. Writes state.geoContext +
// state.resolvedGeo. This is the work route-geo SKIPS when geo is
// pre-resolved (both real API calls avoided).
.embeddedDAG<CartographerState, CartographerState>('geo-resolve', 'geo-resolve', {
'success': 'canonicalize',
'error': 'canonicalize',
}, {
'inputs': {
// Seed the child with the fields the geo nodes read + the routing record
// route-geo already started, so the child's geo-call flags accumulate onto it.
'raw': 'raw',
'canonical': 'canonical',
'routing': 'routing',
},
'outputs': {
'geoContext': 'geoContext',
'resolvedGeo': 'resolvedGeo',
'routing': 'routing',
},
})
// 3. canonicalize: embedded sub-DAG that normalizes scalars and classifies the
// event. Runs AFTER geo so normalize has the timezone from state.geoContext.
// normalize (scalar canonicalization + local time) → classify (eventType/tiers)
.embeddedDAG<CartographerState, CartographerState>('canonicalize', 'canonicalize', {
'success': 'route-kind',
'error': 'rejected',
}, {
'inputs': {
'raw': 'raw',
'geoContext': 'geoContext',
},
'outputs': {
'normalized': 'normalized',
'currentEvent': 'currentEvent',
},
})
// 4. route-kind: per-kind enrichment dispatch (skip irrelevant work).
.node('route-kind', routeKind, {
'geo-only': 'enrich-leg',
'sensor': 'cold-chain-check',
'order': 'order-enrichment',
'customs': 'customs-dwell',
})
// 4a. cold-chain-check (sensor lane): temp/shock breach evaluation.
.node('cold-chain-check', coldChainCheck, {
'checked': 'enrich-leg',
})
// 4b. customs-dwell (customs lane): clearance dwell hours.
.node('customs-dwell', customsDwell, {
'dwelled': 'enrich-leg',
})
// 4c. order-enrichment (order lane): embedded sub-DAG for value enrichment:
// enrich-pricing → enrich-shipping → enrich-eta.
.embeddedDAG<CartographerState, CartographerState>('order-enrichment', 'order-enrichment', {
'success': 'enrich-leg',
'error': 'enrich-leg',
}, {
'inputs': {
'normalized': 'normalized',
},
'outputs': {
'pricedOrder': 'pricedOrder',
'shippingQuote': 'shippingQuote',
'deliveryEstimate': 'deliveryEstimate',
},
})
// 5. enrich-leg: legFrom → scan distance (every lane converges here).
.node('enrich-leg', enrichLeg, {
'leg-measured': 'route-redaction',
})
// 6. route-redaction: SKIP the redaction sub-DAG when not required.
// skip-redaction routes directly to aggregate-event (no intermediate node).
.node('route-redaction', routeRedaction, {
'needs-redaction': 'gdpr',
'skip-redaction': 'aggregate-event',
})
// 6a. gdpr (run path): embedded gdpr-compliance sub-DAG.
.embeddedDAG('gdpr', 'gdpr-compliance', {
'success': 'aggregate-event',
'error': 'gdpr-violation',
}, {
'outputs': {
'currentEvent': 'currentEvent',
'gdprResult': 'gdprResult',
},
})
// 7. aggregate-event: write compact EnrichedShipment to state.enriched.
.node('aggregate-event', aggregateEvent, {
'done': 'done',
})
// Terminals
.terminal('done', 'completed')
.terminal('rejected', 'failed')
.terminal('gdpr-violation', 'failed')
.build();Ingestion sub-DAG: ingest-source
The shared transform node chain. Only the subset each format needs runs; the rest is skipped by the select-source routing node.
/**
* IngestSourceDAG: thin per-source router — the scatter body of the top-level
* ingestion fan-in. One run per source feed; routes by format to the correct
* per-format embedded sub-DAG, then threads state.ingestedEvents back to the
* parent so the `append` gather works.
*
* select-source ─(json)──► [ingest-json] ──► ingested
* ─(csv)───► [ingest-csv] ──► ingested
* ─(gz)────► [ingest-ndjson-gz] ──► ingested
* ─(invalid)──────────────────────► rejected
*
* Each embedded sub-DAG composes the SHARED ingest nodes for its format and
* ends at its own `ingested` terminal. The output mapping threads
* state.ingestedEvents back so the parent scatter's `append` gather
* concatenates each source bucket into state.ingestBuckets.
*
* Terminals: ingested (completed), rejected (failed — unselectable / bad payload).
*/
// #region ingest-source-dag
import { selectSource } from '../nodes/ingest/selectSource.ts';
import { decompress } from '../nodes/ingest/decompress.ts';
import { parseCsv } from '../nodes/ingest/parseCsv.ts';
import { parseJson } from '../nodes/ingest/parseJson.ts';
import { parseNdjson } from '../nodes/ingest/parseNdjson.ts';
import { mapFields } from '../nodes/ingest/mapFields.ts';
import { coerceTypes } from '../nodes/ingest/coerceTypes.ts';
import { validateEvent } from '../nodes/ingest/validateEvent.ts';
import { ingestJsonDAG } from './IngestJsonDAG.ts';
import { ingestCsvDAG } from './IngestCsvDAG.ts';
import { ingestNdjsonGzDAG } from './IngestNdjsonGzDAG.ts';
import type { CartographerState } from '../CartographerState.ts';
import type { CartographerServices } from '../CartographerServices.ts';
import type { DispatcherBundle } from '@noocodex/dagonizer';
import { DAGBuilder } from '@noocodex/dagonizer/builder';
import type { DAG } from '@noocodex/dagonizer/entities';
export const ingestSourceDAG: DAG = new DAGBuilder('ingest-source', '1.0')
// 1. select-source: read source feed from metadata; route by format.
.node('select-source', selectSource, {
'json': 'json',
'csv': 'csv',
'gz': 'gz',
'invalid': 'rejected',
})
// 2a. json: embedded sub-DAG for JSON array sources.
.embeddedDAG<CartographerState, CartographerState>('json', 'ingest-json', {
'success': 'ingested',
'error': 'rejected',
}, {
'outputs': {
'ingestedEvents': 'ingestedEvents',
},
})
// 2b. csv: embedded sub-DAG for CSV sources.
.embeddedDAG<CartographerState, CartographerState>('csv', 'ingest-csv', {
'success': 'ingested',
'error': 'rejected',
}, {
'outputs': {
'ingestedEvents': 'ingestedEvents',
},
})
// 2c. gz: embedded sub-DAG for gzip NDJSON sources.
.embeddedDAG<CartographerState, CartographerState>('gz', 'ingest-ndjson-gz', {
'success': 'ingested',
'error': 'rejected',
}, {
'outputs': {
'ingestedEvents': 'ingestedEvents',
},
})
// Terminals
.terminal('ingested', 'completed')
.terminal('rejected', 'failed')
.build();
export const ingestSourceBundle: DispatcherBundle<CartographerState, CartographerServices> = {
'nodes': [selectSource, decompress, parseCsv, parseJson, parseNdjson, mapFields, coerceTypes, validateEvent],
'dags': [ingestSourceDAG, ingestJsonDAG, ingestCsvDAG, ingestNdjsonGzDAG],
};
// #endregion ingest-source-dagGeo-resolution sub-DAG: geo-resolve
/**
* GeoResolveDAG: the multi-modal geo-resolution sub-DAG (composed of DISTINCT
* nodes — NOT a monolithic resolver). Real geo APIs resolve each modality; the
* fan-in node fuses them:
*
* reverse-geocode (GPS modality, always runs — offline country-coder)
* └─► route-modalities
* ├─ip─────────► ip-geolocate (a real freeipapi.com call) ─┐
* └─gps-only────────────────────────────────────────────┐ │ (skip the IP call)
* ▼ ▼
* fuse-geo (FAN-IN)
* └─► resolved
*
* reverse-geocode, ip-geolocate, and fuse-geo are SEPARATE nodes. The GPS modality
* is resolved OFFLINE (deterministic, no network); the IP modality is conditionally
* executed (route-modalities skips it for GPS-only signals → a real IP call avoided).
* fuse-geo combines the two candidates into state.geoContext with a confidence + the
* modalities that agreed.
*
* Embedded in event-pipeline via route-geo's 'needs-geo' branch; route-geo skips
* this whole sub-DAG (the geo nodes) when the source pre-resolved geo.
*
* The transports (ReverseGeocoder / IpGeolocator) are injected via the services
* bag — the GPS transport is always the offline country-coder; the IP transport is
* Live (real HTTP) online or Recorded (fixture replay) for the smoke.
*/
// #region geo-resolve-dag
import { reverseGeocode } from '../nodes/geo/reverseGeocode.ts';
import { routeModalities } from '../nodes/geo/routeModalities.ts';
import { ipGeolocate } from '../nodes/geo/ipGeolocate.ts';
import { fuseGeo } from '../nodes/geo/fuseGeo.ts';
import type { CartographerState } from '../CartographerState.ts';
import type { CartographerServices } from '../CartographerServices.ts';
import type { DispatcherBundle } from '@noocodex/dagonizer';
import { DAGBuilder } from '@noocodex/dagonizer/builder';
import type { DAG } from '@noocodex/dagonizer/entities';
export const geoResolveDAG: DAG = new DAGBuilder('geo-resolve', '1.0')
// 1. reverse-geocode: GPS modality (offline country-coder). Always runs.
.node('reverse-geocode', reverseGeocode, {
'geocoded': 'route-modalities',
})
// 2. route-modalities: run the IP modality only when a gateway IP is present.
.node('route-modalities', routeModalities, {
'ip': 'ip-geolocate',
'gps-only': 'fuse-geo',
})
// 3. ip-geolocate: IP modality (a real freeipapi.com call). Conditional.
.node('ip-geolocate', ipGeolocate, {
'geolocated': 'fuse-geo',
})
// 4. fuse-geo: FAN-IN the two modality candidates → ResolvedGeo + geoContext.
.node('fuse-geo', fuseGeo, {
'fused': 'resolved',
})
.terminal('resolved', 'completed')
.build();
export const geoResolveBundle: DispatcherBundle<CartographerState, CartographerServices> = {
'nodes': [reverseGeocode, routeModalities, ipGeolocate, fuseGeo],
'dags': [geoResolveDAG],
};
// #endregion geo-resolve-dagGDPR compliance sub-DAG: gdpr-compliance
/**
* GdprComplianceDAG: reusable GDPR compliance sub-pipeline.
*
* Internal flow:
*
* consent-gate
* └─ classify ──► classify-pii
* classify-pii
* └─ redact ──► redact-pii
* redact-pii
* ├─ ok ──► compliant (TerminalNode completed → parent routes 'success')
* └─ violation ──► violation (TerminalNode failed → parent routes 'error')
*
* Embedded via:
* .embeddedDAG('gdpr', 'gdpr-compliance',
* { 'success':'aggregate-event', 'error':'gdpr-violation' },
* { 'outputs': { 'currentEvent':'currentEvent', 'gdprResult':'gdprResult' } })
*
* The embedded DAG runs in a CLONED child state. Its nodes redact PII on
* state.currentEvent and write state.gdprResult; the `outputs` mapping copies
* those two fields back into the parent shipment-pipeline clone when the
* sub-DAG completes (child-key → parent-path orientation).
*/
// #region gdpr-compliance-dag
import { consentGate, classifyPii, redactPii } from '../nodes/gdprNodes.ts';
import type { CartographerState } from '../CartographerState.ts';
import type { CartographerServices } from '../CartographerServices.ts';
import type { DispatcherBundle } from '@noocodex/dagonizer';
import { DAGBuilder } from '@noocodex/dagonizer/builder';
import type { DAG } from '@noocodex/dagonizer/entities';
export const gdprComplianceDAG: DAG = new DAGBuilder('gdpr-compliance', '1.0')
// ── 1. consent-gate ──────────────────────────────────────────────────────
// Resolves the consent status from marketingConsent + simulated expiry.
// Always routes 'classify' (both consented and non-consented proceed;
// the consent status drives redaction rules downstream).
.node('consent-gate', consentGate, {
'classify': 'classify-pii',
})
// ── 2. classify-pii ──────────────────────────────────────────────────────
// Records which fields are personal/sensitive; no routing decision yet.
.node('classify-pii', classifyPii, {
'redact': 'redact-pii',
})
// ── 3. redact-pii ────────────────────────────────────────────────────────
// Applies GdprRedactor.redact. Routes to 'compliant' (ok) or 'violation'.
.node('redact-pii', redactPii, {
'ok': 'compliant',
'violation': 'violation',
})
// ── Terminals ─────────────────────────────────────────────────────────────
.terminal('compliant', 'completed')
.terminal('violation', 'failed')
.build();
// #endregion gdpr-compliance-dag
export const gdprComplianceBundle: DispatcherBundle<CartographerState, CartographerServices> = {
'nodes': [consentGate, classifyPii, redactPii],
'dags': [gdprComplianceDAG],
};State and services
CartographerState
The mutable clipboard threaded through every node. Top-level fields hold the source feeds, ingested events, gathered records, and insights aggregates. Clone fields hold the per-event enrichment pipeline's intermediate values.
export class CartographerState extends NodeStateBase {
/** Number of synthetic journeys to generate. */
eventCount: number = 200;
/**
* The multi-format source feeds, seeded by the seed phase node. Each is a
* `{ sourceId, format, mappingKey, kind, payload }` — a different on-the-wire
* encoding (JSON / CSV / gzip NDJSON) of a partition of the raw scan feed.
*/
sources: SourcePayload[] = [];
/**
* Ingestion fan-in buckets: the `append` gather of the ingestion scatter
* appends each source clone's `ingestedEvents` array as one element here, so
* this is one bucket per source. The `merge-events` node flattens it into the
* unified `canonicalEvents` collection.
*/
ingestBuckets: CanonicalEvent[][] = [];
/**
* The unified canonical event collection. Every source's decoded events are
* flattened into this one array (from `ingestBuckets`); the enrichment scatter
* then reads it.
*/
canonicalEvents: CanonicalEvent[] = [];
// ── Per-source ingest slots (used inside a source's ingest sub-DAG clone) ──
/** The source feed currently being ingested (set from `sources` by select). */
currentSource: SourcePayload = {
'sourceId': '',
'format': 'json',
'mappingKey': 'json-position',
'kind': 'position-ping',
'payload': '',
};
/** Decompressed/raw text of the current source (after `decompress`). */
decodedText: string = '';
/** Records parsed from the decoded text (after parse-csv/json/ndjson). */
parsedRecords: Array<Record<string, unknown>> = [];
/** Records with source field names mapped to canonical fields (after map-fields). */
mappedRecords: Array<Record<string, unknown>> = [];
/** Canonical events validated from this source (after coerce-types + validate-event). */
ingestedEvents: CanonicalEvent[] = [];
/** The single canonical event under enrichment in a scatter clone (set by parse). */
canonical: CanonicalEvent = {
'shipmentId': '',
'eventId': '',
'epochMs': 0,
'kind': 'position-ping',
'sourceId': '',
'sourceFormat': 'json',
'body': {
'scanSeq': 0,
'latitude': 0,
'longitude': 0,
'ipAddress': '',
'legFromLat': 0,
'legFromLng': 0,
'originLat': 0,
'originLng': 0,
'destLat': 0,
'destLng': 0,
'carrier': '',
'facilityId': '',
'status': '',
'weight': 0,
'weightUnit': 'kg',
'lineItems': [],
'rawTimestamp': '',
'rawDispatchAt': '',
'rawPromisedDeliveryAt': '',
'disruptionReason': '',
'tempC': 0,
'humidityPct': 0,
'shockG': 0,
'customsStatus': '',
'delivered': false,
'recipientName': '',
'recipientEmail': '',
'recipientPhone': '',
'recipientAddress': '',
'recipientCountry': '',
'marketingConsent': false,
'lawfulBasis': 'contract',
'specialCategory': 'none',
},
};
/** Enriched shipment records gathered from scatter clones. */
records: EnrichedShipment[] = [];
/** Fixed-size regional insights aggregate produced by summarizeInsights. */
insights: Map<string, RegionInsights> = new Map();
/** Per-journey aggregate (grouped by shipmentId) produced by summarizeInsights. */
journeys: Map<string, JourneyInsights> = new Map();
/** Raw scan from scatter metadata (set by parseEvent). */
raw: RawShipmentEvent = {
'shipmentId': '',
'scanSeq': 0,
'rawTimestamp': '',
'rawDispatchAt': '',
'rawStatus': '',
'carrier': '',
'ipAddress': '',
'latitude': 0,
'longitude': 0,
'legFromLat': 0,
'legFromLng': 0,
'originLat': 0,
'originLng': 0,
'destLat': 0,
'destLng': 0,
'weight': 0,
'weightUnit': 'kg',
'recipientName': '',
'recipientEmail': '',
'recipientPhone': '',
'recipientAddress': '',
'recipientCountry': '',
'marketingConsent': false,
'rawPromisedDeliveryAt': '',
'lineItems': [{ 'productId': '', 'quantity': 1 }],
'facilityId': '',
'lawfulBasis': 'contract',
'specialCategory': 'none',
'disruptionReason': '',
};
/** Normalised canonical form (set by normalize node). */
normalized: NormalizedShipment = {
'shipmentId': '',
'scanSeq': 0,
'epochMs': 0,
'dispatchEpochMs': 0,
'isoTimestamp': '',
'localIso': '',
'utcOffset': '',
'carrierId': '',
'carrierName': '',
'countryIso3': 'UNK',
'weightGrams': 0,
'eventType': 'SCAN',
'serviceTier': 'standard',
'sizeTier': 'small',
'lineItems': [{ 'productId': '', 'quantity': 1 }],
'facilityId': '',
'latitude': 0,
'longitude': 0,
'legFromLat': 0,
'legFromLng': 0,
'originLat': 0,
'originLng': 0,
'destLat': 0,
'destLng': 0,
'recipientName': '',
'recipientEmail': '',
'recipientPhone': '',
'recipientAddress': '',
'recipientCountry': '',
'marketingConsent': false,
'promisedEpochMs': 0,
'disruptionHours': 0,
'disruptionReason': '',
};
/**
* ShipmentEvent-shaped current event used by geo and GDPR nodes.
* Populated from normalized by the classify node.
*/
currentEvent: ShipmentEvent = {
'shipmentId': '',
'timestamp': '',
'eventType': 'SCAN',
'latitude': 0,
'longitude': 0,
'carrier': '',
'facilityId': '',
'recipientName': '',
'recipientEmail': '',
'recipientPhone': '',
'recipientAddress': '',
'recipientCountry': '',
'marketingConsent': false,
'promisedDeliveryAt': '',
};
/** Geo-enrichment result for the current scan (incl. timezone + jurisdiction). */
geoContext: GeoContext = {
'gridZone': '',
'country': '',
'continent': 'Unmapped',
'countries': [],
'region': '',
'hub': '',
'status': 'unmapped',
'waterBodies': [],
'timezone': 'UTC',
'jurisdiction': 'baseline',
};
/** Basket pricing result (set by enrich-pricing node). */
pricedOrder: PricedOrder = {
'lines': [],
'subtotalMinor': 0,
'currency': 'USD',
'subtotalUsdMinor': 0,
'fxRate': 1.0,
};
/** Shipping cost + distance (set by enrich-shipping node). */
shippingQuote: ShippingQuote = {
'distanceKm': 0,
'costUsdMinor': 0,
'breakdown': {
'baseMinor': 0,
'perKmMinor': 0,
'perKgMinor': 0,
'tierMultiplier': 1.0,
},
};
/** ETA calculation (set by enrich-eta node). */
deliveryEstimate: DeliveryEstimate = {
'transitHours': 0,
'etaEpochMs': 0,
'etaIso': '',
'promisedEpochMs': 0,
'onTime': false,
'delayHours': 0,
};
/** Leg distance (legFrom → this scan) in km, set by enrich-leg node. */
legKm: number = 0;
/** Cold-chain breach flag (sensor lane only; set by cold-chain-check). */
coldChainBreach: boolean = false;
/** Customs clearance dwell hours (customs lane only; set by customs-dwell). */
customsDwellHours: number = 0;
/**
* This scan's conditional-routing decisions (the branching headline). Each
* routing node records what RAN vs was SKIPPED here; aggregate-event copies it
* onto the enriched record so the parent's summarize totals the savings (no
* shared mutable counters across scatter clones).
*/
routing: EnrichedShipment['routing'] = CartographerState.defaultRouting();
/** GPS-modality candidate from reverse-geocode (set by the geo-resolve sub-DAG). */
gpsCandidate: GeoCandidate = CartographerState.unresolvedCandidate('gps');
/** IP-modality candidate from ip-geolocate (unresolved when that node skipped). */
ipCandidate: GeoCandidate = CartographerState.unresolvedCandidate('ip');
/** The fused multi-modal location (set by fuse-geo). */
resolvedGeo: ResolvedGeo = {
'country': '',
'countryName': '',
'continent': 'Unmapped',
'region': '',
'locality': '',
'lat': 0,
'lng': 0,
'status': 'land',
'jurisdiction': 'baseline',
'confidence': 0,
'modalities': [],
};
/** GDPR processing result for the current scan (location + consent driven). */
gdprResult: GdprResult = {
'personalDataFields': [],
'sensitiveDataFields': [],
'consentStatus': 'missing',
'lawfulBasis': 'contract',
'jurisdiction': 'baseline',
'strictness': 'light',
'complianceScore': 0,
'retention': { 'retainUntil': '', 'autoDelete': false },
'redactionApplied': false,
'marketingAnalyticsEligible': false,
'coordsCoarsened': false,
};
/** Compact enriched per-scan record written by aggregate-event; parent gather appends it. */
enriched: EnrichedShipment = {
'shipmentId': '',
'scanSeq': 0,
'epochMs': 0,
'localIso': '',
'utcOffset': '',
'timezone': 'UTC',
'jurisdiction': 'baseline',
'continent': 'Unmapped',
'region': '',
'country': '',
'hub': '',
'status': 'unmapped',
'lat': 0,
'lng': 0,
'coordsCoarsened': false,
'legKm': 0,
'eventType': 'SCAN',
'serviceTier': 'standard',
'sizeTier': 'small',
'onTime': false,
'exception': false,
'consentStatus': 'missing',
'disruptionReason': '',
'subtotalUsdMinor': 0,
'currency': 'USD',
'shippingUsdMinor': 0,
'distanceKm': 0,
'transitHours': 0,
'delayHours': 0,
'redactionApplied': false,
'redactedSample': { 'recipientName': '', 'recipientEmail': '', 'recipientPhone': '' },
'routing': CartographerState.defaultRouting(),
};
override clone(): CartographerState {
const copy = new CartographerState();
copy.eventCount = this.eventCount;
copy.sources = this.sources.map((s) => ({ ...s }));
copy.ingestBuckets = this.ingestBuckets.map((bucket) => bucket.map((e) => CartographerState.cloneCanonical(e)));
copy.canonicalEvents = this.canonicalEvents.map((e) => CartographerState.cloneCanonical(e));
copy.records = [...this.records];
copy.insights = new Map(this.insights);
copy.journeys = new Map(this.journeys);
copy.currentSource = { ...this.currentSource };
copy.decodedText = this.decodedText;
copy.parsedRecords = this.parsedRecords.map((r) => ({ ...r }));
copy.mappedRecords = this.mappedRecords.map((r) => ({ ...r }));
copy.ingestedEvents = this.ingestedEvents.map((e) => CartographerState.cloneCanonical(e));
copy.canonical = CartographerState.cloneCanonical(this.canonical);
copy.raw = {
...this.raw,
'lineItems': this.raw.lineItems.map((li) => ({ ...li })),
};
copy.normalized = {
...this.normalized,
'lineItems': this.normalized.lineItems.map((li) => ({ ...li })),
};
copy.currentEvent = { ...this.currentEvent };
copy.geoContext = {
...this.geoContext,
'countries': [...this.geoContext.countries],
'waterBodies': [...this.geoContext.waterBodies],
};
copy.pricedOrder = {
...this.pricedOrder,
'lines': this.pricedOrder.lines.map((l) => ({ ...l })),
};
copy.shippingQuote = {
...this.shippingQuote,
'breakdown': { ...this.shippingQuote.breakdown },
};
copy.deliveryEstimate = { ...this.deliveryEstimate };
copy.legKm = this.legKm;
copy.coldChainBreach = this.coldChainBreach;
copy.customsDwellHours = this.customsDwellHours;
copy.gpsCandidate = { ...this.gpsCandidate };
copy.ipCandidate = { ...this.ipCandidate };
copy.resolvedGeo = { ...this.resolvedGeo, 'modalities': [...this.resolvedGeo.modalities] };
copy.routing = { ...this.routing, 'geoModalities': [...this.routing.geoModalities] };
copy.gdprResult = {
...this.gdprResult,
'personalDataFields': [...this.gdprResult.personalDataFields],
'sensitiveDataFields': [...this.gdprResult.sensitiveDataFields],
'retention': { ...this.gdprResult.retention },
};
copy.enriched = {
...this.enriched,
'redactedSample': { ...this.enriched.redactedSample },
};
return copy;
}
protected override snapshotData(): JsonObject {
return {
'eventCount': this.eventCount,
'sources': this.sources.map((s) => CartographerState.sourceToJson(s)),
'ingestBuckets': this.ingestBuckets.map((bucket) => bucket.map((e) => CartographerState.canonicalToJson(e))),
'canonicalEvents': this.canonicalEvents.map((e) => CartographerState.canonicalToJson(e)),
'canonical': CartographerState.canonicalToJson(this.canonical),
'records': this.records.map((r) => CartographerState.enrichedToJson(r)),
'raw': CartographerState.rawToJson(this.raw),
'normalized': CartographerState.normalizedToJson(this.normalized),
'currentEvent': CartographerState.eventToJson(this.currentEvent),
'geoContext': {
'gridZone': this.geoContext.gridZone,
'country': this.geoContext.country,
'continent': this.geoContext.continent,
'countries': [...this.geoContext.countries],
'region': this.geoContext.region,
'hub': this.geoContext.hub,
'status': this.geoContext.status,
'waterBodies': [...this.geoContext.waterBodies],
'timezone': this.geoContext.timezone,
'jurisdiction': this.geoContext.jurisdiction,
},
'pricedOrder': CartographerState.pricedOrderToJson(this.pricedOrder),
'shippingQuote': {
'distanceKm': this.shippingQuote.distanceKm,
'costUsdMinor': this.shippingQuote.costUsdMinor,
'breakdown': { ...this.shippingQuote.breakdown },
},
'deliveryEstimate': {
'transitHours': this.deliveryEstimate.transitHours,
'etaEpochMs': this.deliveryEstimate.etaEpochMs,
'etaIso': this.deliveryEstimate.etaIso,
'promisedEpochMs': this.deliveryEstimate.promisedEpochMs,
'onTime': this.deliveryEstimate.onTime,
'delayHours': this.deliveryEstimate.delayHours,
},
'legKm': this.legKm,
'coldChainBreach': this.coldChainBreach,
'customsDwellHours': this.customsDwellHours,
'routing': {
'path': this.routing.path,
'geoLookupRun': this.routing.geoLookupRun,
'geoLookupSkipped': this.routing.geoLookupSkipped,
'redactionRun': this.routing.redactionRun,
'redactionSkipped': this.routing.redactionSkipped,
'pricingRun': this.routing.pricingRun,
'pricingSkipped': this.routing.pricingSkipped,
'etaRun': this.routing.etaRun,
'etaSkipped': this.routing.etaSkipped,
'coldChainRun': this.routing.coldChainRun,
'customsDwellRun': this.routing.customsDwellRun,
},
'gdprResult': {
'personalDataFields': [...this.gdprResult.personalDataFields],
'sensitiveDataFields': [...this.gdprResult.sensitiveDataFields],
'consentStatus': this.gdprResult.consentStatus,
'lawfulBasis': this.gdprResult.lawfulBasis,
'jurisdiction': this.gdprResult.jurisdiction,
'strictness': this.gdprResult.strictness,
'complianceScore': this.gdprResult.complianceScore,
'retention': {
'retainUntil': this.gdprResult.retention.retainUntil,
'autoDelete': this.gdprResult.retention.autoDelete,
},
'redactionApplied': this.gdprResult.redactionApplied,
'marketingAnalyticsEligible': this.gdprResult.marketingAnalyticsEligible,
'coordsCoarsened': this.gdprResult.coordsCoarsened,
},
'enriched': CartographerState.enrichedToJson(this.enriched),
};
}
protected override restoreData(snap: JsonObject): void {
if (typeof snap['eventCount'] === 'number') this.eventCount = snap['eventCount'];
if (Array.isArray(snap['sources'])) {
this.sources = snap['sources'].map((s) => CartographerState.sourceFromJson(CartographerState.asObject(s) ?? {}));
}
if (Array.isArray(snap['ingestBuckets'])) {
this.ingestBuckets = snap['ingestBuckets'].map((bucket) =>
Array.isArray(bucket)
? bucket.map((e) => CartographerState.canonicalFromJson(CartographerState.asObject(e) ?? {}))
: [],
);
}
if (Array.isArray(snap['canonicalEvents'])) {
this.canonicalEvents = snap['canonicalEvents'].map((e) => CartographerState.canonicalFromJson(CartographerState.asObject(e) ?? {}));
}
const canObj = CartographerState.asObject(snap['canonical']);
if (canObj !== null) this.canonical = CartographerState.canonicalFromJson(canObj);
if (Array.isArray(snap['records'])) {
this.records = snap['records'].map((r) => CartographerState.enrichedFromJson(CartographerState.asObject(r) ?? {}));
}
const rawObj = CartographerState.asObject(snap['raw']);
if (rawObj !== null) this.raw = CartographerState.rawFromJson(rawObj);
const normObj = CartographerState.asObject(snap['normalized']);
if (normObj !== null) this.normalized = CartographerState.normalizedFromJson(normObj);
const ceObj = CartographerState.asObject(snap['currentEvent']);
if (ceObj !== null) this.currentEvent = CartographerState.eventFromJson(ceObj);
const gcObj = CartographerState.asObject(snap['geoContext']);
if (gcObj !== null) {
this.geoContext = {
'gridZone': CartographerState.str(gcObj['gridZone']),
'country': CartographerState.str(gcObj['country']),
'continent': CartographerState.str(gcObj['continent'], 'Unmapped'),
'countries': CartographerState.strArr(gcObj['countries']),
'region': CartographerState.str(gcObj['region']),
'hub': CartographerState.str(gcObj['hub']),
'status': CartographerState.geoStatus(gcObj['status']),
'waterBodies': CartographerState.strArr(gcObj['waterBodies']),
'timezone': CartographerState.str(gcObj['timezone'], 'UTC'),
'jurisdiction': CartographerState.jurisdiction(gcObj['jurisdiction']),
};
}
const poObj = CartographerState.asObject(snap['pricedOrder']);
if (poObj !== null) this.pricedOrder = CartographerState.pricedOrderFromJson(poObj);
const sqObj = CartographerState.asObject(snap['shippingQuote']);
if (sqObj !== null) {
const bdObj = CartographerState.asObject(sqObj['breakdown']) ?? {};
this.shippingQuote = {
'distanceKm': CartographerState.num(sqObj['distanceKm']),
'costUsdMinor': CartographerState.num(sqObj['costUsdMinor']),
'breakdown': {
'baseMinor': CartographerState.num(bdObj['baseMinor']),
'perKmMinor': CartographerState.num(bdObj['perKmMinor']),
'perKgMinor': CartographerState.num(bdObj['perKgMinor']),
'tierMultiplier': CartographerState.num(bdObj['tierMultiplier'], 1.0),
},
};
}
const deObj = CartographerState.asObject(snap['deliveryEstimate']);
if (deObj !== null) {
this.deliveryEstimate = {
'transitHours': CartographerState.num(deObj['transitHours']),
'etaEpochMs': CartographerState.num(deObj['etaEpochMs']),
'etaIso': CartographerState.str(deObj['etaIso']),
'promisedEpochMs': CartographerState.num(deObj['promisedEpochMs']),
'onTime': CartographerState.bool(deObj['onTime']),
'delayHours': CartographerState.num(deObj['delayHours']),
};
}
if (typeof snap['legKm'] === 'number') this.legKm = snap['legKm'];
if (typeof snap['coldChainBreach'] === 'boolean') this.coldChainBreach = snap['coldChainBreach'];
if (typeof snap['customsDwellHours'] === 'number') this.customsDwellHours = snap['customsDwellHours'];
if (snap['routing'] !== undefined) this.routing = CartographerState.routingFromJson(snap['routing']);
const grObj = CartographerState.asObject(snap['gdprResult']);
if (grObj !== null) {
const retObj = CartographerState.asObject(grObj['retention']) ?? {};
this.gdprResult = {
'personalDataFields': CartographerState.strArr(grObj['personalDataFields']),
'sensitiveDataFields': CartographerState.strArr(grObj['sensitiveDataFields']),
'consentStatus': CartographerState.consentStatus(grObj['consentStatus']),
'lawfulBasis': CartographerState.lawfulBasis(grObj['lawfulBasis']),
'jurisdiction': CartographerState.jurisdiction(grObj['jurisdiction']),
'strictness': CartographerState.strictness(grObj['strictness']),
'complianceScore': CartographerState.num(grObj['complianceScore']),
'retention': {
'retainUntil': CartographerState.str(retObj['retainUntil']),
'autoDelete': CartographerState.bool(retObj['autoDelete']),
},
'redactionApplied': CartographerState.bool(grObj['redactionApplied']),
'marketingAnalyticsEligible': CartographerState.bool(grObj['marketingAnalyticsEligible']),
'coordsCoarsened': CartographerState.bool(grObj['coordsCoarsened']),
};
}
const enObj = CartographerState.asObject(snap['enriched']);
if (enObj !== null) this.enriched = CartographerState.enrichedFromJson(enObj);
}
// ── Scalar narrowing helpers (no blanket `as unknown as` casts) ────────────
private static asObject(value: unknown): Record<string, unknown> | null {
if (value !== null && value !== undefined && typeof value === 'object' && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return null;
}
private static str(value: unknown, fallback: string = ''): string {
return typeof value === 'string' ? value : fallback;
}
private static num(value: unknown, fallback: number = 0): number {
return typeof value === 'number' ? value : fallback;
}
private static bool(value: unknown, fallback: boolean = false): boolean {
return typeof value === 'boolean' ? value : fallback;
}
private static strArr(value: unknown): string[] {
return Array.isArray(value) ? value.filter((v): v is string => typeof v === 'string') : [];
}
// ── CanonicalEvent / SourcePayload narrowers + reconstruction ──────────────
private static canonicalKind(value: unknown): CanonicalEvent['kind'] {
return value === 'position-ping' || value === 'facility-scan' || value === 'sensor-reading'
|| value === 'customs-event' || value === 'delivery-confirmation'
? value
: 'position-ping';
}
private static sourceFormat(value: unknown): SourcePayload['format'] {
return value === 'json' || value === 'csv' || value === 'ndjson.gz' ? value : 'json';
}
/** Deep-clone a CanonicalEvent (body + optional geo) for V8-stable copies. */
private static cloneCanonical(e: CanonicalEvent): CanonicalEvent {
const copy: CanonicalEvent = {
'shipmentId': e.shipmentId,
'eventId': e.eventId,
'epochMs': e.epochMs,
'kind': e.kind,
'sourceId': e.sourceId,
'sourceFormat': e.sourceFormat,
'body': { ...e.body, 'lineItems': e.body.lineItems.map((li) => ({ ...li })) },
};
if (e.geo !== undefined) copy.geo = { ...e.geo };
if (e.consentHandled !== undefined) copy.consentHandled = e.consentHandled;
if (e.pii !== undefined) copy.pii = e.pii;
return copy;
}
private static canonicalToJson(e: CanonicalEvent): JsonObject {
const body: JsonObject = {
'scanSeq': e.body.scanSeq,
'latitude': e.body.latitude,
'longitude': e.body.longitude,
'ipAddress': e.body.ipAddress,
'legFromLat': e.body.legFromLat,
'legFromLng': e.body.legFromLng,
'originLat': e.body.originLat,
'originLng': e.body.originLng,
'destLat': e.body.destLat,
'destLng': e.body.destLng,
'carrier': e.body.carrier,
'facilityId': e.body.facilityId,
'status': e.body.status,
'weight': e.body.weight,
'weightUnit': e.body.weightUnit,
'lineItems': e.body.lineItems.map((li) => ({ 'productId': li.productId, 'quantity': li.quantity })),
'rawTimestamp': e.body.rawTimestamp,
'rawDispatchAt': e.body.rawDispatchAt,
'rawPromisedDeliveryAt': e.body.rawPromisedDeliveryAt,
'disruptionReason': e.body.disruptionReason,
'tempC': e.body.tempC,
'humidityPct': e.body.humidityPct,
'shockG': e.body.shockG,
'customsStatus': e.body.customsStatus,
'delivered': e.body.delivered,
'recipientName': e.body.recipientName,
'recipientEmail': e.body.recipientEmail,
'recipientPhone': e.body.recipientPhone,
'recipientAddress': e.body.recipientAddress,
'recipientCountry': e.body.recipientCountry,
'marketingConsent': e.body.marketingConsent,
'lawfulBasis': e.body.lawfulBasis,
'specialCategory': e.body.specialCategory,
};
return {
'shipmentId': e.shipmentId,
'eventId': e.eventId,
'epochMs': e.epochMs,
'kind': e.kind,
'sourceId': e.sourceId,
'sourceFormat': e.sourceFormat,
'body': body,
'geo': e.geo !== undefined ? { 'country': e.geo.country, 'continent': e.geo.continent, 'region': e.geo.region } : null,
'consentHandled': e.consentHandled !== undefined ? e.consentHandled : null,
'pii': e.pii !== undefined ? e.pii : null,
};
}
private static canonicalFromJson(o: Record<string, unknown>): CanonicalEvent {
const b = CartographerState.asObject(o['body']) ?? {};
const event: CanonicalEvent = {
'shipmentId': CartographerState.str(o['shipmentId']),
'eventId': CartographerState.str(o['eventId']),
'epochMs': CartographerState.num(o['epochMs']),
'kind': CartographerState.canonicalKind(o['kind']),
'sourceId': CartographerState.str(o['sourceId']),
'sourceFormat': CartographerState.sourceFormat(o['sourceFormat']),
'body': {
'scanSeq': CartographerState.num(b['scanSeq']),
'latitude': CartographerState.num(b['latitude']),
'longitude': CartographerState.num(b['longitude']),
'ipAddress': CartographerState.str(b['ipAddress']),
'legFromLat': CartographerState.num(b['legFromLat']),
'legFromLng': CartographerState.num(b['legFromLng']),
'originLat': CartographerState.num(b['originLat']),
'originLng': CartographerState.num(b['originLng']),
'destLat': CartographerState.num(b['destLat']),
'destLng': CartographerState.num(b['destLng']),
'carrier': CartographerState.str(b['carrier']),
'facilityId': CartographerState.str(b['facilityId']),
'status': CartographerState.str(b['status']),
'weight': CartographerState.num(b['weight']),
'weightUnit': CartographerState.weightUnit(b['weightUnit']),
'lineItems': CartographerState.lineItemsFromJson(b['lineItems']),
'rawTimestamp': CartographerState.str(b['rawTimestamp']),
'rawDispatchAt': CartographerState.str(b['rawDispatchAt']),
'rawPromisedDeliveryAt': CartographerState.str(b['rawPromisedDeliveryAt']),
'disruptionReason': CartographerState.str(b['disruptionReason']),
'tempC': CartographerState.num(b['tempC']),
'humidityPct': CartographerState.num(b['humidityPct']),
'shockG': CartographerState.num(b['shockG']),
'customsStatus': CartographerState.str(b['customsStatus']),
'delivered': CartographerState.bool(b['delivered']),
'recipientName': CartographerState.str(b['recipientName']),
'recipientEmail': CartographerState.str(b['recipientEmail']),
'recipientPhone': CartographerState.str(b['recipientPhone']),
'recipientAddress': CartographerState.str(b['recipientAddress']),
'recipientCountry': CartographerState.str(b['recipientCountry']),
'marketingConsent': CartographerState.bool(b['marketingConsent']),
'lawfulBasis': CartographerState.lawfulBasis(b['lawfulBasis']),
'specialCategory': CartographerState.specialCategory(b['specialCategory']),
},
};
const geoObj = CartographerState.asObject(o['geo']);
if (geoObj !== null) {
event.geo = {
'country': CartographerState.str(geoObj['country']),
'continent': CartographerState.str(geoObj['continent']),
'region': CartographerState.str(geoObj['region']),
};
}
if (typeof o['consentHandled'] === 'boolean') event.consentHandled = o['consentHandled'];
if (typeof o['pii'] === 'boolean') event.pii = o['pii'];
return event;
}
private static sourceToJson(s: SourcePayload): JsonObject {
return {
'sourceId': s.sourceId,
'format': s.format,
'mappingKey': s.mappingKey,
'kind': s.kind,
'payload': s.payload,
};
}
private static sourceFromJson(o: Record<string, unknown>): SourcePayload {
return {
'sourceId': CartographerState.str(o['sourceId']),
'format': CartographerState.sourceFormat(o['format']),
'mappingKey': CartographerState.str(o['mappingKey'], 'json-position'),
'kind': CartographerState.canonicalKind(o['kind']),
'payload': CartographerState.str(o['payload']),
};
}
private static geoStatus(value: unknown): GeoContext['status'] {
return value === 'land' || value === 'water' || value === 'coastal' || value === 'unmapped'
? value
: 'unmapped';
}
private static consentStatus(value: unknown): GdprResult['consentStatus'] {
return value === 'valid' || value === 'missing' || value === 'expired' ? value : 'missing';
}
private static lawfulBasis(value: unknown): GdprResult['lawfulBasis'] {
return value === 'contract' || value === 'consent' || value === 'legitimate-interest' || value === 'none'
? value
: 'contract';
}
private static jurisdiction(value: unknown): GeoContext['jurisdiction'] {
return value === 'GDPR' || value === 'UK-GDPR' || value === 'CCPA'
|| value === 'LGPD' || value === 'APPI' || value === 'baseline'
|| value === 'international-waters'
? value
: 'baseline';
}
private static strictness(value: unknown): GdprResult['strictness'] {
return value === 'strict' || value === 'moderate' || value === 'light' ? value : 'light';
}
private static eventType(value: unknown): ShipmentEvent['eventType'] {
return value === 'SCAN' || value === 'DEPARTURE' || value === 'ARRIVAL'
|| value === 'OUT_FOR_DELIVERY' || value === 'DELIVERED' || value === 'EXCEPTION'
? value
: 'SCAN';
}
private static serviceTier(value: unknown): NormalizedShipment['serviceTier'] {
return value === 'express' || value === 'standard' || value === 'economy' ? value : 'standard';
}
private static sizeTier(value: unknown): NormalizedShipment['sizeTier'] {
return value === 'envelope' || value === 'small' || value === 'medium'
|| value === 'large' || value === 'freight'
? value
: 'small';
}
private static weightUnit(value: unknown): RawShipmentEvent['weightUnit'] {
return value === 'lb' || value === 'kg' || value === 'g' || value === 'oz' ? value : 'kg';
}
private static specialCategory(value: unknown): RawShipmentEvent['specialCategory'] {
return value === 'none' || value === 'health' ? value : 'none';
}
private static lineItemsFromJson(value: unknown): Array<{ 'productId': string; 'quantity': number }> {
if (!Array.isArray(value)) return [{ 'productId': '', 'quantity': 1 }];
const items = value
.map((li) => CartographerState.asObject(li))
.filter((li): li is Record<string, unknown> => li !== null)
.map((li) => ({
'productId': CartographerState.str(li['productId']),
'quantity': CartographerState.num(li['quantity'], 1),
}));
return items.length > 0 ? items : [{ 'productId': '', 'quantity': 1 }];
}
// ── Entity ↔ JSON reconstruction (field-by-field) ──────────────────────────
private static rawToJson(r: RawShipmentEvent): JsonObject {
return {
'shipmentId': r.shipmentId, 'scanSeq': r.scanSeq, 'rawTimestamp': r.rawTimestamp,
'rawDispatchAt': r.rawDispatchAt, 'rawStatus': r.rawStatus,
'carrier': r.carrier, 'ipAddress': r.ipAddress, 'latitude': r.latitude, 'longitude': r.longitude,
'legFromLat': r.legFromLat, 'legFromLng': r.legFromLng,
'originLat': r.originLat, 'originLng': r.originLng, 'destLat': r.destLat, 'destLng': r.destLng,
'weight': r.weight, 'weightUnit': r.weightUnit,
'recipientName': r.recipientName, 'recipientEmail': r.recipientEmail, 'recipientPhone': r.recipientPhone,
'recipientAddress': r.recipientAddress, 'recipientCountry': r.recipientCountry,
'marketingConsent': r.marketingConsent, 'rawPromisedDeliveryAt': r.rawPromisedDeliveryAt,
'lineItems': r.lineItems.map((li) => ({ 'productId': li.productId, 'quantity': li.quantity })),
'facilityId': r.facilityId, 'lawfulBasis': r.lawfulBasis, 'specialCategory': r.specialCategory,
'disruptionReason': r.disruptionReason,
};
}
private static rawFromJson(o: Record<string, unknown>): RawShipmentEvent {
return {
'shipmentId': CartographerState.str(o['shipmentId']),
'scanSeq': CartographerState.num(o['scanSeq']),
'rawTimestamp': CartographerState.str(o['rawTimestamp']),
'rawDispatchAt': CartographerState.str(o['rawDispatchAt']),
'rawStatus': CartographerState.str(o['rawStatus']),
'carrier': CartographerState.str(o['carrier']),
'ipAddress': CartographerState.str(o['ipAddress']),
'latitude': CartographerState.num(o['latitude']),
'longitude': CartographerState.num(o['longitude']),
'legFromLat': CartographerState.num(o['legFromLat']),
'legFromLng': CartographerState.num(o['legFromLng']),
'originLat': CartographerState.num(o['originLat']),
'originLng': CartographerState.num(o['originLng']),
'destLat': CartographerState.num(o['destLat']),
'destLng': CartographerState.num(o['destLng']),
'weight': CartographerState.num(o['weight']),
'weightUnit': CartographerState.weightUnit(o['weightUnit']),
'recipientName': CartographerState.str(o['recipientName']),
'recipientEmail': CartographerState.str(o['recipientEmail']),
'recipientPhone': CartographerState.str(o['recipientPhone']),
'recipientAddress': CartographerState.str(o['recipientAddress']),
'recipientCountry': CartographerState.str(o['recipientCountry']),
'marketingConsent': CartographerState.bool(o['marketingConsent']),
'rawPromisedDeliveryAt': CartographerState.str(o['rawPromisedDeliveryAt']),
'lineItems': CartographerState.lineItemsFromJson(o['lineItems']),
'facilityId': CartographerState.str(o['facilityId']),
'lawfulBasis': CartographerState.lawfulBasis(o['lawfulBasis']),
'specialCategory': CartographerState.specialCategory(o['specialCategory']),
'disruptionReason': CartographerState.str(o['disruptionReason']),
};
}
private static normalizedToJson(n: NormalizedShipment): JsonObject {
return {
'shipmentId': n.shipmentId, 'scanSeq': n.scanSeq, 'epochMs': n.epochMs, 'dispatchEpochMs': n.dispatchEpochMs,
'isoTimestamp': n.isoTimestamp, 'localIso': n.localIso, 'utcOffset': n.utcOffset,
'carrierId': n.carrierId, 'carrierName': n.carrierName, 'countryIso3': n.countryIso3,
'weightGrams': n.weightGrams, 'eventType': n.eventType, 'serviceTier': n.serviceTier, 'sizeTier': n.sizeTier,
'lineItems': n.lineItems.map((li) => ({ 'productId': li.productId, 'quantity': li.quantity })),
'facilityId': n.facilityId, 'latitude': n.latitude, 'longitude': n.longitude,
'legFromLat': n.legFromLat, 'legFromLng': n.legFromLng,
'originLat': n.originLat, 'originLng': n.originLng, 'destLat': n.destLat, 'destLng': n.destLng,
'recipientName': n.recipientName, 'recipientEmail': n.recipientEmail, 'recipientPhone': n.recipientPhone,
'recipientAddress': n.recipientAddress, 'recipientCountry': n.recipientCountry,
'marketingConsent': n.marketingConsent, 'promisedEpochMs': n.promisedEpochMs,
'disruptionHours': n.disruptionHours, 'disruptionReason': n.disruptionReason,
};
}
private static normalizedFromJson(o: Record<string, unknown>): NormalizedShipment {
return {
'shipmentId': CartographerState.str(o['shipmentId']),
'scanSeq': CartographerState.num(o['scanSeq']),
'epochMs': CartographerState.num(o['epochMs']),
'dispatchEpochMs': CartographerState.num(o['dispatchEpochMs']),
'isoTimestamp': CartographerState.str(o['isoTimestamp']),
'localIso': CartographerState.str(o['localIso']),
'utcOffset': CartographerState.str(o['utcOffset']),
'carrierId': CartographerState.str(o['carrierId']),
'carrierName': CartographerState.str(o['carrierName']),
'countryIso3': CartographerState.str(o['countryIso3'], 'UNK'),
'weightGrams': CartographerState.num(o['weightGrams']),
'eventType': CartographerState.eventType(o['eventType']),
'serviceTier': CartographerState.serviceTier(o['serviceTier']),
'sizeTier': CartographerState.sizeTier(o['sizeTier']),
'lineItems': CartographerState.lineItemsFromJson(o['lineItems']),
'facilityId': CartographerState.str(o['facilityId']),
'latitude': CartographerState.num(o['latitude']),
'longitude': CartographerState.num(o['longitude']),
'legFromLat': CartographerState.num(o['legFromLat']),
'legFromLng': CartographerState.num(o['legFromLng']),
'originLat': CartographerState.num(o['originLat']),
'originLng': CartographerState.num(o['originLng']),
'destLat': CartographerState.num(o['destLat']),
'destLng': CartographerState.num(o['destLng']),
'recipientName': CartographerState.str(o['recipientName']),
'recipientEmail': CartographerState.str(o['recipientEmail']),
'recipientPhone': CartographerState.str(o['recipientPhone']),
'recipientAddress': CartographerState.str(o['recipientAddress']),
'recipientCountry': CartographerState.str(o['recipientCountry']),
'marketingConsent': CartographerState.bool(o['marketingConsent']),
'promisedEpochMs': CartographerState.num(o['promisedEpochMs']),
'disruptionHours': CartographerState.num(o['disruptionHours']),
'disruptionReason': CartographerState.str(o['disruptionReason']),
};
}
private static eventToJson(e: ShipmentEvent): JsonObject {
return {
'shipmentId': e.shipmentId, 'timestamp': e.timestamp, 'eventType': e.eventType,
'latitude': e.latitude, 'longitude': e.longitude, 'carrier': e.carrier, 'facilityId': e.facilityId,
'recipientName': e.recipientName, 'recipientEmail': e.recipientEmail, 'recipientPhone': e.recipientPhone,
'recipientAddress': e.recipientAddress, 'recipientCountry': e.recipientCountry,
'marketingConsent': e.marketingConsent, 'promisedDeliveryAt': e.promisedDeliveryAt,
};
}
private static eventFromJson(o: Record<string, unknown>): ShipmentEvent {
return {
'shipmentId': CartographerState.str(o['shipmentId']),
'timestamp': CartographerState.str(o['timestamp']),
'eventType': CartographerState.eventType(o['eventType']),
'latitude': CartographerState.num(o['latitude']),
'longitude': CartographerState.num(o['longitude']),
'carrier': CartographerState.str(o['carrier']),
'facilityId': CartographerState.str(o['facilityId']),
'recipientName': CartographerState.str(o['recipientName']),
'recipientEmail': CartographerState.str(o['recipientEmail']),
'recipientPhone': CartographerState.str(o['recipientPhone']),
'recipientAddress': CartographerState.str(o['recipientAddress']),
'recipientCountry': CartographerState.str(o['recipientCountry']),
'marketingConsent': CartographerState.bool(o['marketingConsent']),
'promisedDeliveryAt': CartographerState.str(o['promisedDeliveryAt']),
};
}
private static pricedOrderToJson(p: PricedOrder): JsonObject {
return {
'lines': p.lines.map((l) => ({
'productId': l.productId, 'name': l.name, 'category': l.category, 'quantity': l.quantity,
'unitPriceMinor': l.unitPriceMinor, 'currency': l.currency, 'lineTotalMinor': l.lineTotalMinor,
})),
'subtotalMinor': p.subtotalMinor, 'currency': p.currency,
'subtotalUsdMinor': p.subtotalUsdMinor, 'fxRate': p.fxRate,
};
}
private static pricedOrderFromJson(o: Record<string, unknown>): PricedOrder {
const lines = Array.isArray(o['lines'])
? o['lines']
.map((l) => CartographerState.asObject(l))
.filter((l): l is Record<string, unknown> => l !== null)
.map((l) => ({
'productId': CartographerState.str(l['productId']),
'name': CartographerState.str(l['name']),
'category': CartographerState.str(l['category']),
'quantity': CartographerState.num(l['quantity'], 1),
'unitPriceMinor': CartographerState.num(l['unitPriceMinor']),
'currency': CartographerState.str(l['currency'], 'USD'),
'lineTotalMinor': CartographerState.num(l['lineTotalMinor']),
}))
: [];
return {
'lines': lines,
'subtotalMinor': CartographerState.num(o['subtotalMinor']),
'currency': CartographerState.str(o['currency'], 'USD'),
'subtotalUsdMinor': CartographerState.num(o['subtotalUsdMinor']),
'fxRate': CartographerState.num(o['fxRate'], 1.0),
};
}
private static enrichedToJson(e: EnrichedShipment): JsonObject {
return {
'shipmentId': e.shipmentId, 'scanSeq': e.scanSeq, 'epochMs': e.epochMs,
'localIso': e.localIso, 'utcOffset': e.utcOffset, 'timezone': e.timezone, 'jurisdiction': e.jurisdiction,
'continent': e.continent, 'region': e.region, 'country': e.country, 'hub': e.hub, 'status': e.status,
'lat': e.lat, 'lng': e.lng, 'coordsCoarsened': e.coordsCoarsened, 'legKm': e.legKm,
'eventType': e.eventType, 'serviceTier': e.serviceTier, 'sizeTier': e.sizeTier,
'onTime': e.onTime, 'exception': e.exception, 'consentStatus': e.consentStatus,
'disruptionReason': e.disruptionReason,
'subtotalUsdMinor': e.subtotalUsdMinor, 'currency': e.currency,
'shippingUsdMinor': e.shippingUsdMinor, 'distanceKm': e.distanceKm,
'transitHours': e.transitHours, 'delayHours': e.delayHours,
'redactionApplied': e.redactionApplied,
'redactedSample': {
'recipientName': e.redactedSample.recipientName,
'recipientEmail': e.redactedSample.recipientEmail,
'recipientPhone': e.redactedSample.recipientPhone,
},
'routing': CartographerState.routingToJson(e.routing),
};
}
/** The all-false default routing record (single source of truth). */
static defaultRouting(): EnrichedShipment['routing'] {
return {
'path': 'order',
'geoLookupRun': false,
'geoLookupSkipped': false,
'reverseGeocodeRun': false,
'ipGeolocateRun': false,
'ipGeolocateSkipped': false,
'geoConfidence': 0,
'geoModalities': [],
'redactionRun': false,
'redactionSkipped': false,
'pricingRun': false,
'pricingSkipped': false,
'etaRun': false,
'etaSkipped': false,
'coldChainRun': false,
'customsDwellRun': false,
};
}
/** An unresolved GeoCandidate for the given modality (the default). */
static unresolvedCandidate(modality: 'gps' | 'ip'): GeoCandidate {
return {
'modality': modality, 'resolved': false, 'country': '', 'countryName': '',
'continent': '', 'region': '', 'locality': '', 'lat': 0, 'lng': 0, 'water': false,
};
}
private static routingToJson(r: EnrichedShipment['routing']): JsonObject {
return {
'path': r.path,
'geoLookupRun': r.geoLookupRun,
'geoLookupSkipped': r.geoLookupSkipped,
'reverseGeocodeRun': r.reverseGeocodeRun,
'ipGeolocateRun': r.ipGeolocateRun,
'ipGeolocateSkipped': r.ipGeolocateSkipped,
'geoConfidence': r.geoConfidence,
'geoModalities': [...r.geoModalities],
'redactionRun': r.redactionRun,
'redactionSkipped': r.redactionSkipped,
'pricingRun': r.pricingRun,
'pricingSkipped': r.pricingSkipped,
'etaRun': r.etaRun,
'etaSkipped': r.etaSkipped,
'coldChainRun': r.coldChainRun,
'customsDwellRun': r.customsDwellRun,
};
}
private static routingPath(value: unknown): EnrichedShipment['routing']['path'] {
return value === 'geo-only' || value === 'sensor' || value === 'order' || value === 'customs'
? value
: 'order';
}
private static routingFromJson(value: unknown): EnrichedShipment['routing'] {
const o = CartographerState.asObject(value) ?? {};
return {
'path': CartographerState.routingPath(o['path']),
'geoLookupRun': CartographerState.bool(o['geoLookupRun']),
'geoLookupSkipped': CartographerState.bool(o['geoLookupSkipped']),
'reverseGeocodeRun': CartographerState.bool(o['reverseGeocodeRun']),
'ipGeolocateRun': CartographerState.bool(o['ipGeolocateRun']),
'ipGeolocateSkipped': CartographerState.bool(o['ipGeolocateSkipped']),
'geoConfidence': CartographerState.num(o['geoConfidence']),
'geoModalities': CartographerState.strArr(o['geoModalities']),
'redactionRun': CartographerState.bool(o['redactionRun']),
'redactionSkipped': CartographerState.bool(o['redactionSkipped']),
'pricingRun': CartographerState.bool(o['pricingRun']),
'pricingSkipped': CartographerState.bool(o['pricingSkipped']),
'etaRun': CartographerState.bool(o['etaRun']),
'etaSkipped': CartographerState.bool(o['etaSkipped']),
'coldChainRun': CartographerState.bool(o['coldChainRun']),
'customsDwellRun': CartographerState.bool(o['customsDwellRun']),
};
}
private static enrichedFromJson(o: Record<string, unknown>): EnrichedShipment {
const sample = CartographerState.asObject(o['redactedSample']) ?? {};
return {
'shipmentId': CartographerState.str(o['shipmentId']),
'scanSeq': CartographerState.num(o['scanSeq']),
'epochMs': CartographerState.num(o['epochMs']),
'localIso': CartographerState.str(o['localIso']),
'utcOffset': CartographerState.str(o['utcOffset']),
'timezone': CartographerState.str(o['timezone'], 'UTC'),
'jurisdiction': CartographerState.jurisdiction(o['jurisdiction']),
'continent': CartographerState.str(o['continent'], 'Unmapped'),
'region': CartographerState.str(o['region']),
'country': CartographerState.str(o['country']),
'hub': CartographerState.str(o['hub']),
'status': CartographerState.geoStatus(o['status']),
'lat': CartographerState.num(o['lat']),
'lng': CartographerState.num(o['lng']),
'coordsCoarsened': CartographerState.bool(o['coordsCoarsened']),
'legKm': CartographerState.num(o['legKm']),
'eventType': CartographerState.eventType(o['eventType']),
'serviceTier': CartographerState.serviceTier(o['serviceTier']),
'sizeTier': CartographerState.sizeTier(o['sizeTier']),
'onTime': CartographerState.bool(o['onTime']),
'exception': CartographerState.bool(o['exception']),
'consentStatus': CartographerState.consentStatus(o['consentStatus']),
'disruptionReason': CartographerState.str(o['disruptionReason']),
'subtotalUsdMinor': CartographerState.num(o['subtotalUsdMinor']),
'currency': CartographerState.str(o['currency'], 'USD'),
'shippingUsdMinor': CartographerState.num(o['shippingUsdMinor']),
'distanceKm': CartographerState.num(o['distanceKm']),
'transitHours': CartographerState.num(o['transitHours']),
'delayHours': CartographerState.num(o['delayHours']),
'redactionApplied': CartographerState.bool(o['redactionApplied']),
'redactedSample': {
'recipientName': CartographerState.str(sample['recipientName']),
'recipientEmail': CartographerState.str(sample['recipientEmail']),
'recipientPhone': CartographerState.str(sample['recipientPhone']),
},
'routing': CartographerState.routingFromJson(o['routing']),
};
}
}CartographerServices
The services bag injected via Dagonizer constructor options. Geo resolution uses swappable transport adapters: the GPS modality is always the offline @rapideditor/country-coder (deterministic, no HTTP); the IP modality uses the live freeipapi.com API online or recorded fixture replay for the smoke tests.
export interface CartographerServices {
/** GPS-modality transport (reverse-geocode coords → place). */
readonly reverseGeocoder: ReverseGeocoder;
/** IP-modality transport (geolocate gateway IP → place). */
readonly ipGeolocator: IpGeolocator;
}GeoResolvers
Factory that assembles the CartographerServices bag for the chosen backend.
export class GeoResolvers {
/** Live-IP services: offline country-coder reverse-geocode + live freeipapi IP geolocation. */
static live(): CartographerServices {
return {
'reverseGeocoder': new OfflineReverseGeocoder(),
'ipGeolocator': new LiveIpGeolocator(),
};
}
/** Recorded-IP services: offline country-coder reverse-geocode + fixture-replay IP geolocation.
* Deterministic and offline — used by the smoke and `--recorded` CLI flag. */
static recorded(): CartographerServices {
return {
'reverseGeocoder': new OfflineReverseGeocoder(),
'ipGeolocator': new RecordedIpGeolocator(),
};
}
}Key nodes
seedEvents — pre-phase
The pre-phase node runs before the DAG entrypoint. It calls Sources.build(state.eventCount) to produce the four heterogeneous source feeds (JSON position-pings, CSV facility-scans, gzip NDJSON sensor-readings, JSON customs/delivery) and writes them to state.sources. The ingestion scatter then reads state.sources by path.
export const seedEvents: NodeInterface<CartographerState, never, CartographerServices> = {
'name': 'seed',
'outputs': [],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
state.sources = await Sources.build(state.eventCount);
return { 'output': undefined as never };
},
};normalize — local time at the scan's timezone
After geo-enrichment sets state.geoContext.timezone, normalize converts the raw timestamp to a UTC epoch, then derives the local time at the scan's IANA timezone using Intl.DateTimeFormat. Cross-zone journeys show different local times and UTC offsets per scan.
export const normalize: NodeInterface<CartographerState, 'normalized' | 'rejected', CartographerServices> = {
'name': 'normalize',
'outputs': ['normalized', 'rejected'],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
const raw = state.raw;
const epochMs = TimeNormalizer.toEpochMs(raw.rawTimestamp);
if (!isFinite(epochMs) || epochMs <= 0) {
return { 'output': 'rejected' };
}
const dispatchEpochMs = TimeNormalizer.toEpochMs(raw.rawDispatchAt);
const validDispatch = isFinite(dispatchEpochMs) && dispatchEpochMs > 0 ? dispatchEpochMs : epochMs;
const promisedEpochMs = TimeNormalizer.toEpochMs(raw.rawPromisedDeliveryAt);
const validPromised = isFinite(promisedEpochMs) && promisedEpochMs > 0 ? promisedEpochMs : validDispatch + 7 * 86_400_000;
// Local time at the scan's timezone (resolved by geo-context).
const { localIso, utcOffset } = TimeZoneResolver.localParts(epochMs, state.geoContext.timezone);
const { carrierId, carrierName } = CarrierRegistry.canonical(raw.carrier);
const countryIso3 = CountryCodes.toIso3(raw.recipientCountry);
const weightGrams = Units.toGrams(raw.weight, raw.weightUnit);
const disruptionHours = Disruptions.hoursFor(raw.disruptionReason);
state.normalized = {
'shipmentId': raw.shipmentId,
'scanSeq': raw.scanSeq,
'epochMs': epochMs,
'dispatchEpochMs': validDispatch,
'isoTimestamp': TimeNormalizer.toIso(epochMs),
'localIso': localIso,
'utcOffset': utcOffset,
'carrierId': carrierId,
'carrierName': carrierName,
'countryIso3': countryIso3,
'weightGrams': weightGrams,
// eventType / serviceTier / sizeTier are derived by the classify node.
'eventType': 'SCAN',
'serviceTier': 'standard',
'sizeTier': 'small',
'lineItems': raw.lineItems,
'facilityId': raw.facilityId,
'latitude': raw.latitude,
'longitude': raw.longitude,
'legFromLat': raw.legFromLat,
'legFromLng': raw.legFromLng,
'originLat': raw.originLat,
'originLng': raw.originLng,
'destLat': raw.destLat,
'destLng': raw.destLng,
'recipientName': raw.recipientName,
'recipientEmail': raw.recipientEmail,
'recipientPhone': raw.recipientPhone,
'recipientAddress': raw.recipientAddress,
'recipientCountry': raw.recipientCountry,
'marketingConsent': raw.marketingConsent,
'promisedEpochMs': validPromised,
'disruptionHours': disruptionHours,
'disruptionReason': raw.disruptionReason,
};
return { 'output': 'normalized' };
},
};aggregateEvent — writes the enriched record
Pulls every enrichment result out of the clone's state and assembles the compact EnrichedShipment record. The routing decisions, redacted PII sample, and pricing/ shipping/ETA figures all land here.
export const aggregateEvent: NodeInterface<CartographerState, 'done', CartographerServices> = {
'name': 'aggregate-event',
'outputs': ['done'],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
const norm = state.normalized;
const geo = state.geoContext;
const gdpr = state.gdprResult;
const po = state.pricedOrder;
const sq = state.shippingQuote;
const de = state.deliveryEstimate;
const ev = state.currentEvent;
const isException = norm.eventType === 'EXCEPTION';
state.enriched = {
'shipmentId': norm.shipmentId,
'scanSeq': norm.scanSeq,
'epochMs': norm.epochMs,
'localIso': norm.localIso,
'utcOffset': norm.utcOffset,
'timezone': geo.timezone,
'jurisdiction': geo.jurisdiction,
// Macro continent for the per-region insights rollup (from the real API).
'continent': geo.continent,
'region': geo.region,
'country': geo.country,
'hub': geo.hub,
'status': geo.status,
// Stored coords come from currentEvent, which GDPR coarsened in-place
// when the jurisdiction is strict or consent is not valid.
'lat': ev.latitude,
'lng': ev.longitude,
'coordsCoarsened': gdpr.coordsCoarsened,
'legKm': state.legKm,
'eventType': norm.eventType,
'serviceTier': norm.serviceTier,
'sizeTier': norm.sizeTier,
'onTime': de.onTime,
'exception': isException,
'consentStatus': gdpr.consentStatus,
'disruptionReason': norm.disruptionReason,
'subtotalUsdMinor': po.subtotalUsdMinor,
'currency': po.currency,
'shippingUsdMinor': sq.costUsdMinor,
'distanceKm': sq.distanceKm,
'transitHours': de.transitHours,
'delayHours': de.delayHours,
'redactionApplied': gdpr.redactionApplied,
'redactedSample': {
'recipientName': ev.recipientName,
'recipientEmail': ev.recipientEmail,
'recipientPhone': ev.recipientPhone,
},
// This scan's conditional-routing decisions (RAN vs SKIPPED per branch),
// recorded by the route-* nodes on this clone. The parent's summarize
// totals them into the savings view.
'routing': { ...state.routing },
};
return { 'output': 'done' };
},
};summarizeInsights — fold into two views
After all scatter clones complete, summarizeInsights folds state.records into:
- Per-continent rollup (
state.insights): counts, on-time rate, revenue (USD), distance. - Per-journey rollup (
state.journeys): grouped byshipmentId, ordered by epoch; path distance, elapsed time, timezones crossed, jurisdictions traversed.
export const summarizeInsights: NodeInterface<CartographerState, 'success', CartographerServices> = {
'name': 'summarize',
'outputs': ['success'],
async execute(state, context) {
if (context.signal.aborted) {
throw new Error('Aborted');
}
state.insights = new Map<string, RegionInsights>();
state.journeys = new Map<string, JourneyInsights>();
// ── (b) Group scans by shipmentId for per-journey reconstruction ──────────
const scansByShipment = new Map<string, JourneyScan[]>();
for (const record of state.records) {
if (!record.shipmentId) continue;
// ── (a) per-region accumulation (rolled up to CONTINENT) ────────────────
// Bucket by the macro continent the real geo API resolved (not the fine
// subdivision/country), so the table reads ~6–8 rows. Maritime pings (open
// water → no continent) collapse into one 'International Waters / Maritime'
// bucket. The continent is always present (default 'Unmapped' upstream),
// so the key is consistent — never a bare country code or subdivision.
const key = record.status === 'water'
? 'International Waters / Maritime'
: record.continent;
let entry = state.insights.get(key);
if (entry === undefined) {
entry = {
'region': key,
'country': key,
'hub': key,
'deliveries': 0,
'exceptions': 0,
'onTimeCount': 0,
'lateCount': 0,
'totalSubtotalUsdMinor': 0,
'totalShippingUsdMinor': 0,
'totalDistanceKm': 0,
'totalDelayHours': 0,
'consentValid': 0,
'consentMissing': 0,
'consentExpired': 0,
'sizeTierEnvelope': 0,
'sizeTierSmall': 0,
'sizeTierMedium': 0,
'sizeTierLarge': 0,
'sizeTierFreight': 0,
'shipmentCount': 0,
};
state.insights.set(key, entry);
}
entry.shipmentCount++;
entry.totalSubtotalUsdMinor += record.subtotalUsdMinor;
entry.totalShippingUsdMinor += record.shippingUsdMinor;
entry.totalDistanceKm += record.distanceKm;
// On-time is only meaningful for order-lane events that ran the ETA node;
// position/sensor/customs events skip pricing/eta (the branching saves it),
// so they are NOT counted toward on-time% (which would otherwise read 0%).
if (record.routing.etaRun) {
if (record.onTime) entry.onTimeCount++;
else {
entry.lateCount++;
entry.totalDelayHours += record.delayHours;
}
}
if (record.eventType === 'DELIVERED') entry.deliveries++;
if (record.exception) entry.exceptions++;
if (record.consentStatus === 'valid') entry.consentValid++;
if (record.consentStatus === 'missing') entry.consentMissing++;
if (record.consentStatus === 'expired') entry.consentExpired++;
switch (record.sizeTier) {
case 'envelope': entry.sizeTierEnvelope++; break;
case 'small': entry.sizeTierSmall++; break;
case 'medium': entry.sizeTierMedium++; break;
case 'large': entry.sizeTierLarge++; break;
case 'freight': entry.sizeTierFreight++; break;
}
// ── (b) collect the scan for this journey ───────────────────────────────
let scans = scansByShipment.get(record.shipmentId);
if (scans === undefined) {
scans = [];
scansByShipment.set(record.shipmentId, scans);
}
scans.push({
'scanSeq': record.scanSeq,
'epochMs': record.epochMs,
'localIso': record.localIso,
'utcOffset': record.utcOffset,
'timezone': record.timezone,
'jurisdiction': record.jurisdiction,
'eventType': record.eventType,
'hub': record.hub,
'region': record.region,
'country': record.country,
'lat': record.lat,
'lng': record.lng,
'legKm': record.legKm,
'disruptionReason': record.disruptionReason,
});
}
// ── (b) build the per-journey aggregates ──────────────────────────────────
for (const [shipmentId, rawScans] of scansByShipment) {
// Order by scanSeq (the authoritative journey order); epochMs is a display
// value that can collapse/reorder under lossy raw timestamp formats, so it
// is only a tiebreak.
const scans = [...rawScans].sort((a, b) => a.scanSeq - b.scanSeq || a.epochMs - b.epochMs);
const first = scans[0];
const last = scans[scans.length - 1];
if (first === undefined || last === undefined) continue;
let pathKm = 0;
let minEpoch = first.epochMs;
let maxEpoch = first.epochMs;
const offsets: string[] = [];
const timezones: string[] = [];
const jurisdictions: string[] = [];
const statusProgression: string[] = [];
let delivered = false;
for (const s of scans) {
pathKm += s.legKm;
if (s.epochMs < minEpoch) minEpoch = s.epochMs;
if (s.epochMs > maxEpoch) maxEpoch = s.epochMs;
if (!offsets.includes(s.utcOffset)) offsets.push(s.utcOffset);
if (!timezones.includes(s.timezone)) timezones.push(s.timezone);
if (!jurisdictions.includes(s.jurisdiction)) jurisdictions.push(s.jurisdiction);
statusProgression.push(s.eventType);
if (s.eventType === 'DELIVERED') delivered = true;
}
// Shipment-level facts (on-time, delay, pricing) come from an ORDER-lane
// record of the journey — one that actually ran pricing/eta. Position/
// sensor/customs scans skip that work, so prefer an eta-bearing record;
// fall back to any record only if the journey has no order-lane scan.
const orderRecord = state.records.find(
(r) => r.shipmentId === shipmentId && r.shipmentId.length > 0 && r.routing.etaRun,
);
const deliveryRecord = orderRecord
?? state.records.find((r) => r.shipmentId === shipmentId && r.shipmentId.length > 0);
const onTime = deliveryRecord?.onTime ?? false;
const delayHours = deliveryRecord?.delayHours ?? 0;
const subtotalUsdMinor = deliveryRecord?.subtotalUsdMinor ?? 0;
const shippingUsdMinor = deliveryRecord?.shippingUsdMinor ?? 0;
const journey: JourneyInsights = {
'shipmentId': shipmentId,
'scans': scans,
'scanCount': scans.length,
'pathKm': pathKm,
'firstEpochMs': minEpoch,
'lastEpochMs': maxEpoch,
'elapsedHours': (maxEpoch - minEpoch) / 3_600_000,
'timezones': timezones,
'offsets': offsets,
'jurisdictions': jurisdictions,
'statusProgression': statusProgression,
'lastStatus': last.eventType,
'lastHub': last.hub,
'delivered': delivered,
'onTime': onTime,
'delayHours': delayHours,
'subtotalUsdMinor': subtotalUsdMinor,
'shippingUsdMinor': shippingUsdMinor,
};
state.journeys.set(shipmentId, journey);
}
return { 'output': 'success' };
},
};Entities
EnrichedShipment — the per-scan enriched record
import type { FromSchema } from 'json-schema-to-ts';
export const EnrichedShipmentSchema = {
'$id': 'https://noocodex.dev/schemas/cartographer/EnrichedShipment',
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'type': 'object',
'required': [
'shipmentId', 'scanSeq', 'epochMs', 'localIso', 'utcOffset', 'timezone', 'jurisdiction',
'continent', 'region', 'country', 'hub', 'status',
'lat', 'lng', 'coordsCoarsened', 'legKm',
'eventType', 'serviceTier', 'sizeTier',
'onTime', 'exception', 'consentStatus', 'disruptionReason',
'subtotalUsdMinor', 'currency',
'shippingUsdMinor', 'distanceKm',
'transitHours', 'delayHours',
'redactionApplied', 'redactedSample', 'routing',
],
'properties': {
'shipmentId': { 'type': 'string', 'minLength': 1 },
'scanSeq': { 'type': 'number', 'minimum': 0 },
'epochMs': { 'type': 'number' },
'localIso': { 'type': 'string' },
'utcOffset': { 'type': 'string' },
'timezone': { 'type': 'string' },
'jurisdiction': { 'type': 'string', 'enum': ['GDPR', 'UK-GDPR', 'CCPA', 'LGPD', 'APPI', 'baseline', 'international-waters'] },
// Macro continent (from a real API) — the per-region insights table buckets by this.
'continent': { 'type': 'string', 'minLength': 1 },
'region': { 'type': 'string', 'minLength': 1 },
'country': { 'type': 'string', 'minLength': 1 },
'hub': { 'type': 'string', 'minLength': 1 },
'status': { 'type': 'string', 'enum': ['land', 'water', 'coastal', 'unmapped'] },
'lat': { 'type': 'number' },
'lng': { 'type': 'number' },
'coordsCoarsened': { 'type': 'boolean' },
'legKm': { 'type': 'number', 'minimum': 0 },
'eventType': { 'type': 'string', 'enum': ['SCAN', 'DEPARTURE', 'ARRIVAL', 'OUT_FOR_DELIVERY', 'DELIVERED', 'EXCEPTION'] },
'serviceTier': { 'type': 'string', 'enum': ['express', 'standard', 'economy'] },
'sizeTier': { 'type': 'string', 'enum': ['envelope', 'small', 'medium', 'large', 'freight'] },
'onTime': { 'type': 'boolean' },
'exception': { 'type': 'boolean' },
'consentStatus': { 'type': 'string', 'enum': ['valid', 'missing', 'expired'] },
'disruptionReason': { 'type': 'string' },
'subtotalUsdMinor': { 'type': 'number', 'minimum': 0 },
'currency': { 'type': 'string', 'minLength': 3, 'maxLength': 3 },
'shippingUsdMinor': { 'type': 'number', 'minimum': 0 },
'distanceKm': { 'type': 'number', 'minimum': 0 },
'transitHours': { 'type': 'number', 'minimum': 0 },
'delayHours': { 'type': 'number', 'minimum': 0 },
'redactionApplied': { 'type': 'boolean' },
'redactedSample': {
'type': 'object',
'required': ['recipientName', 'recipientEmail', 'recipientPhone'],
'properties': {
'recipientName': { 'type': 'string' },
'recipientEmail': { 'type': 'string' },
'recipientPhone': { 'type': 'string' },
},
'additionalProperties': false,
},
// This scan's conditional-routing decisions (RAN vs SKIPPED per branch),
// including REAL geo-API call accounting (reverse-geocode + ip-geolocate).
'routing': {
'type': 'object',
'required': [
'path',
'geoLookupRun', 'geoLookupSkipped',
'reverseGeocodeRun', 'ipGeolocateRun', 'ipGeolocateSkipped',
'geoConfidence', 'geoModalities',
'redactionRun', 'redactionSkipped',
'pricingRun', 'pricingSkipped',
'etaRun', 'etaSkipped',
'coldChainRun', 'customsDwellRun',
],
'properties': {
// The per-kind enrichment lane this event took.
'path': { 'type': 'string', 'enum': ['geo-only', 'sensor', 'order', 'customs'] },
// Whether the whole geo-resolve sub-DAG (real API calls) ran or was skipped.
'geoLookupRun': { 'type': 'boolean' },
'geoLookupSkipped': { 'type': 'boolean' },
// Real API-call accounting inside geo-resolve.
'reverseGeocodeRun': { 'type': 'boolean' },
'ipGeolocateRun': { 'type': 'boolean' },
'ipGeolocateSkipped': { 'type': 'boolean' },
// Multi-modal fusion outcome carried for the report.
'geoConfidence': { 'type': 'number', 'minimum': 0, 'maximum': 1 },
'geoModalities': { 'type': 'array', 'items': { 'type': 'string' } },
'redactionRun': { 'type': 'boolean' },
'redactionSkipped': { 'type': 'boolean' },
'pricingRun': { 'type': 'boolean' },
'pricingSkipped': { 'type': 'boolean' },
'etaRun': { 'type': 'boolean' },
'etaSkipped': { 'type': 'boolean' },
'coldChainRun': { 'type': 'boolean' },
'customsDwellRun': { 'type': 'boolean' },
},
'additionalProperties': false,
},
},
'additionalProperties': false,
} as const;
export type EnrichedShipment = FromSchema<typeof EnrichedShipmentSchema>;CanonicalEvent — the unified event model
import type { FromSchema } from 'json-schema-to-ts';
export const CanonicalEventSchema = {
'$id': 'https://noocodex.dev/schemas/cartographer/CanonicalEvent',
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'type': 'object',
'required': ['shipmentId', 'eventId', 'epochMs', 'kind', 'sourceId', 'sourceFormat', 'body'],
'properties': {
'shipmentId': { 'type': 'string', 'minLength': 1 },
'eventId': { 'type': 'string', 'minLength': 1 },
'epochMs': { 'type': 'number' },
'kind': {
'type': 'string',
'enum': ['position-ping', 'facility-scan', 'sensor-reading', 'customs-event', 'delivery-confirmation'],
},
// Provenance: which source + format this event was decoded from.
'sourceId': { 'type': 'string', 'minLength': 1 },
'sourceFormat': { 'type': 'string', 'enum': ['json', 'csv', 'ndjson.gz'] },
// Per-kind body. One object shape (V8-stable); a kind populates the fields
// it owns and zeroes/defaults the rest.
'body': {
'type': 'object',
'required': [
'scanSeq', 'latitude', 'longitude', 'ipAddress',
'legFromLat', 'legFromLng', 'originLat', 'originLng', 'destLat', 'destLng',
'carrier', 'facilityId', 'status',
'weight', 'weightUnit', 'lineItems',
'rawTimestamp', 'rawDispatchAt', 'rawPromisedDeliveryAt', 'disruptionReason',
'tempC', 'humidityPct', 'shockG',
'customsStatus', 'delivered',
'recipientName', 'recipientEmail', 'recipientPhone', 'recipientAddress', 'recipientCountry',
'marketingConsent', 'lawfulBasis', 'specialCategory',
],
'properties': {
'scanSeq': { 'type': 'number' },
'latitude': { 'type': 'number' },
'longitude': { 'type': 'number' },
// The asset's per-region public gateway IP (the IP modality's signal).
'ipAddress': { 'type': 'string' },
// journey geometry (previous-scan + shipment-level origin/destination)
'legFromLat': { 'type': 'number' },
'legFromLng': { 'type': 'number' },
'originLat': { 'type': 'number' },
'originLng': { 'type': 'number' },
'destLat': { 'type': 'number' },
'destLng': { 'type': 'number' },
'carrier': { 'type': 'string' },
'facilityId': { 'type': 'string' },
'status': { 'type': 'string' },
// parcel + basket
'weight': { 'type': 'number' },
'weightUnit': { 'type': 'string', 'enum': ['lb', 'kg', 'g', 'oz'] },
'lineItems': {
'type': 'array',
'items': {
'type': 'object',
'required': ['productId', 'quantity'],
'properties': {
'productId': { 'type': 'string' },
'quantity': { 'type': 'number' },
},
'additionalProperties': false,
},
},
// raw timestamps (scan / dispatch / SLA promise) for normalization + ETA
'rawTimestamp': { 'type': 'string' },
'rawDispatchAt': { 'type': 'string' },
'rawPromisedDeliveryAt': { 'type': 'string' },
'disruptionReason': { 'type': 'string' },
// sensor-reading channels (cold-chain telemetry)
'tempC': { 'type': 'number' },
'humidityPct': { 'type': 'number' },
'shockG': { 'type': 'number' },
// customs-event
'customsStatus': { 'type': 'string' },
// delivery-confirmation
'delivered': { 'type': 'boolean' },
// recipient PII (delivery / facility scans carry it raw)
'recipientName': { 'type': 'string' },
'recipientEmail': { 'type': 'string' },
'recipientPhone': { 'type': 'string' },
'recipientAddress': { 'type': 'string' },
'recipientCountry': { 'type': 'string' },
'marketingConsent': { 'type': 'boolean' },
'lawfulBasis': { 'type': 'string', 'enum': ['contract', 'consent', 'legitimate-interest', 'none'] },
'specialCategory': { 'type': 'string', 'enum': ['none', 'health'] },
},
'additionalProperties': false,
},
// OPTIONAL pre-resolved fields (ingest-boundary; Stage 2 branches on them).
'geo': {
'type': 'object',
'required': ['country', 'continent', 'region'],
'properties': {
'country': { 'type': 'string' },
'continent': { 'type': 'string' },
'region': { 'type': 'string' },
},
'additionalProperties': false,
},
'consentHandled': { 'type': 'boolean' },
'pii': { 'type': 'boolean' },
},
'additionalProperties': false,
} as const;
export type CanonicalEvent = FromSchema<typeof CanonicalEventSchema>;GeoContext — geo-enrichment result
import type { FromSchema } from 'json-schema-to-ts';
export const GeoContextSchema = {
'$id': 'https://noocodex.dev/schemas/cartographer/GeoContext',
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'type': 'object',
'required': ['gridZone', 'country', 'continent', 'countries', 'region', 'hub', 'status', 'waterBodies', 'timezone', 'jurisdiction'],
'properties': {
'gridZone': { 'type': 'string', 'minLength': 1 },
'country': { 'type': 'string', 'minLength': 1 },
// Macro continent (from a real API) — the insights table buckets by this.
'continent': { 'type': 'string', 'minLength': 1 },
'countries': { 'type': 'array', 'items': { 'type': 'string' } },
'region': { 'type': 'string', 'minLength': 1 },
'hub': { 'type': 'string', 'minLength': 1 },
'status': { 'type': 'string', 'enum': ['land', 'water', 'coastal', 'unmapped'] },
'waterBodies': { 'type': 'array', 'items': { 'type': 'string' } },
'timezone': { 'type': 'string', 'minLength': 1 },
'jurisdiction': { 'type': 'string', 'enum': ['GDPR', 'UK-GDPR', 'CCPA', 'LGPD', 'APPI', 'baseline', 'international-waters'] },
},
'additionalProperties': false,
} as const;
export type GeoContext = FromSchema<typeof GeoContextSchema>;Offline geo resolution
GPS reverse-geocode uses the offline @rapideditor/country-coder boundary dataset — no HTTP, no key, deterministic, runs identically in Node 18+ and the browser. IP geolocation uses the live freeipapi.com API (CORS-enabled, no key), or a committed fixture replay in the smoke tests.
export class OfflineReverseGeocoder implements ReverseGeocoder {
async lookup(lat: number, lng: number, signal: AbortSignal): Promise<GeoCandidate> {
if (signal.aborted) throw new Error('Aborted');
return OfflineGeo.resolve(lat, lng);
}
}CLI
# Run with 200 journeys (live IP geolocation when network reachable):
npx tsx examples/the-cartographer/runCartographer.ts
# Force offline / recorded mode:
npx tsx examples/the-cartographer/runCartographer.ts --recorded
# Custom event count:
npx tsx examples/the-cartographer/runCartographer.ts --events 50import { CartographerState } from './CartographerState.ts';
import type { JourneyInsights } from './CartographerState.ts';
import type { CartographerServices } from './CartographerServices.ts';
import { cartographerBundle } from './dag.ts';
import { canonicalizeBundle } from './embedded-dags/CanonicalizeDAG.ts';
import { gdprComplianceBundle } from './embedded-dags/GdprComplianceDAG.ts';
import { geoResolveBundle } from './embedded-dags/GeoResolveDAG.ts';
import { ingestSourceBundle } from './embedded-dags/IngestSourceDAG.ts';
import { ingestJsonBundle } from './embedded-dags/IngestJsonDAG.ts';
import { ingestCsvBundle } from './embedded-dags/IngestCsvDAG.ts';
import { ingestNdjsonGzBundle } from './embedded-dags/IngestNdjsonGzDAG.ts';
import { orderEnrichmentBundle } from './embedded-dags/OrderEnrichmentDAG.ts';
import type { EnrichedShipment } from './entities/EnrichedShipment.ts';
import { GeoResolvers } from './services/GeoResolvers.ts';
import { Dagonizer } from '@noocodex/dagonizer';
import { ExecutionError } from '@noocodex/dagonizer/errors';
// ── Parse CLI args ────────────────────────────────────────────────────────────
let eventCount = 200;
let forceRecorded = false;
const args = process.argv.slice(2);
for (let i = 0; i < args.length; i++) {
if (args[i] === '--events' && args[i + 1] !== undefined) {
const parsed = parseInt(args[i + 1] ?? '200', 10);
if (!isNaN(parsed) && parsed > 0) eventCount = parsed;
} else if (args[i] === '--recorded') {
forceRecorded = true;
} else if (/^\d+$/.test(args[i] ?? '')) {
const parsed = parseInt(args[i] ?? '200', 10);
if (!isNaN(parsed) && parsed > 0) eventCount = parsed;
}
}
// ── Geo backend selection: LIVE IP if a network is reachable, else RECORDED ────
// GPS reverse-geocode is ALWAYS offline (the `@rapideditor/country-coder` boundary
// dataset — deterministic, no network) — only the IP modality is a live API call.
// `useLive` selects the live freeipapi.com IP geolocator when reachable; otherwise
// (and with `--recorded`) the recorded IP fixture replays for a deterministic,
// offline run. The probe targets freeipapi (the only live modality).
async function networkReachable(): Promise<boolean> {
try {
const probe = new AbortController();
const timer = setTimeout(() => probe.abort(), 4000);
const res = await fetch('https://freeipapi.com/api/json/8.8.8.8', {
'signal': probe.signal,
'headers': { 'accept': 'application/json' },
});
clearTimeout(timer);
return res.ok;
} catch {
return false;
}
}
const useLive = !forceRecorded && (await networkReachable());
const services: CartographerServices = useLive ? GeoResolvers.live() : GeoResolvers.recorded();
// ── Dispatcher ────────────────────────────────────────────────────────────────
const dispatcher = new Dagonizer<CartographerState, CartographerServices>({ 'services': services });
dispatcher.registerBundle(geoResolveBundle);
dispatcher.registerBundle(canonicalizeBundle);
dispatcher.registerBundle(orderEnrichmentBundle);
dispatcher.registerBundle(gdprComplianceBundle);
dispatcher.registerBundle(ingestJsonBundle);
dispatcher.registerBundle(ingestCsvBundle);
dispatcher.registerBundle(ingestNdjsonGzBundle);
dispatcher.registerBundle(ingestSourceBundle);
dispatcher.registerBundle(cartographerBundle);
const state = new CartographerState();
state.eventCount = eventCount;
console.log(`\nCartographer: ${eventCount} journeys → multi-format sources → fan-in → streaming enrichment (concurrency=16)`);
console.log(`Geo backend: offline country-coder reverse-geocode + ${useLive ? 'LIVE freeipapi.com IP geolocation' : 'RECORDED IP fixture replay (offline)'}\n`);
// ── Execute ───────────────────────────────────────────────────────────────────
const ac = new AbortController();
process.once('SIGINT', () => {
console.log('\n[SIGINT] Aborting pipeline...');
ac.abort();
});
let dotCount = 0;
try {
const execution = dispatcher.execute('cartographer', state, { 'signal': ac.signal });
for await (const stage of execution) {
if (!stage.skipped) {
process.stdout.write('.');
dotCount++;
if (dotCount % 80 === 0) process.stdout.write('\n');
}
}
await execution;
} catch (err) {
if (err instanceof ExecutionError) {
console.error(`\nExecution failed: ${err.message}`);
process.exit(1);
}
throw err;
}
if (dotCount > 0) process.stdout.write('\n');
const processedRecords = state.records.filter((r) => r.shipmentId.length > 0);
// ── (0a) Multi-source ingestion provenance: per-source / per-format counts ─────
console.log('=== (0) Ingestion fan-in — ≥3 formats decoded via SHARED nodes → one model ===\n');
const COL_SRC = 26;
const COL_FMT = 12;
const ihdr = 'Source'.padEnd(COL_SRC) + 'Format'.padEnd(COL_FMT) + 'Events'.padStart(8);
console.log(ihdr);
console.log('-'.repeat(ihdr.length));
const eventsBySource = new Map<string, number>();
for (const ev of state.canonicalEvents) {
eventsBySource.set(ev.sourceId, (eventsBySource.get(ev.sourceId) ?? 0) + 1);
}
const formatBySource = new Map<string, string>();
for (const s of state.sources) formatBySource.set(s.sourceId, s.format);
const distinctFormats = new Set<string>();
for (const s of state.sources) distinctFormats.add(s.format);
for (const s of state.sources) {
const count = eventsBySource.get(s.sourceId) ?? 0;
console.log(
s.sourceId.slice(0, COL_SRC - 1).padEnd(COL_SRC) +
s.format.padEnd(COL_FMT) +
String(count).padStart(8),
);
}
console.log(`\nDistinct source formats decoded: ${distinctFormats.size} (${[...distinctFormats].sort().join(', ')})`);
console.log(`Total canonical events fanned in: ${state.canonicalEvents.length}`);
// ── (0b) Per-kind counts on the unified canonical model ───────────────────────
console.log('\n=== (0b) Canonical kinds (heterogeneous events, one collection) ===\n');
const byKind = new Map<string, number>();
for (const ev of state.canonicalEvents) byKind.set(ev.kind, (byKind.get(ev.kind) ?? 0) + 1);
for (const kind of [...byKind.keys()].sort()) {
console.log(` ${kind.padEnd(24)} ${String(byKind.get(kind) ?? 0).padStart(6)}`);
}
const richGeo = state.canonicalEvents.filter((e) => e.geo !== undefined).length;
console.log(`\nEvents carrying pre-resolved geo (RICH source → Stage 2 can skip geo-lookup): ${richGeo}`);
console.log('');
// ── (a) Normalization sample — a multi-zone, multi-scan journey ───────────────
function fmtCoord(lat: number, lng: number): string {
const ns = `${Math.abs(lat).toFixed(2)}${lat >= 0 ? 'N' : 'S'}`;
const ew = `${Math.abs(lng).toFixed(2)}${lng >= 0 ? 'E' : 'W'}`;
return `${ns} ${ew}`;
}
// Prefer a journey that crosses >=2 timezones to show differing local offsets.
const multiZoneJourney =
[...state.journeys.values()].find((j) => j.scanCount >= 3 && j.offsets.length >= 2)
?? [...state.journeys.values()].find((j) => j.scanCount >= 2 && j.offsets.length >= 2)
?? [...state.journeys.values()].find((j) => j.scanCount >= 2);
console.log('=== (a) Normalization Sample — one journey, per-scan LOCAL time ===\n');
if (multiZoneJourney !== undefined) {
console.log(`${multiZoneJourney.shipmentId} (${multiZoneJourney.scanCount} scans, ${multiZoneJourney.timezones.length} timezone(s), offsets: ${multiZoneJourney.offsets.join(', ')})`);
for (const s of multiZoneJourney.scans) {
const time = s.localIso.slice(11, 16);
console.log(
` seq ${s.scanSeq} ${time} ${s.utcOffset.padEnd(7)} ${s.eventType.padEnd(16)} ` +
`${s.hub.slice(0, 18).padEnd(19)} ${fmtCoord(s.lat, s.lng).padEnd(20)} [${s.jurisdiction}]`,
);
}
}
// ── (b) Per-continent insights table ─────────────────────────────────────────
// Rolled up to the macro continent the real geo API resolved (~6–8 rows), plus a
// single maritime bucket — the precise locality/country stays on each journey scan.
console.log('\n=== (b) Per-Continent Insights ===\n');
const COL_REGION = 34;
const COL_COUNT = 7;
const COL_EXC = 6;
const COL_ONTIME = 8;
const COL_REV = 12;
const COL_SHIP = 11;
const COL_DIST = 10;
const hdr =
'Continent'.padEnd(COL_REGION) +
'Scans'.padStart(COL_COUNT) +
'Exc'.padStart(COL_EXC) +
'OnTime%'.padStart(COL_ONTIME) +
'Rev $USD'.padStart(COL_REV) +
'Ship $USD'.padStart(COL_SHIP) +
'Dist km'.padStart(COL_DIST);
console.log(hdr);
console.log('-'.repeat(hdr.length));
const sortedRegions = [...state.insights.values()].sort((a, b) => a.region.localeCompare(b.region));
for (const r of sortedRegions) {
const total = r.onTimeCount + r.lateCount;
const onTimePct = total > 0 ? Math.round((r.onTimeCount / total) * 100) : 0;
const revUsd = (r.totalSubtotalUsdMinor / 100).toFixed(0);
const shipUsd = (r.totalShippingUsdMinor / 100).toFixed(0);
const distKm = r.totalDistanceKm > 0 ? Math.round(r.totalDistanceKm / r.shipmentCount).toString() : '0';
console.log(
r.region.slice(0, COL_REGION - 1).padEnd(COL_REGION) +
String(r.shipmentCount).padStart(COL_COUNT) +
String(r.exceptions).padStart(COL_EXC) +
`${onTimePct}%`.padStart(COL_ONTIME) +
`$${revUsd}`.padStart(COL_REV) +
`$${shipUsd}`.padStart(COL_SHIP) +
`${distKm}`.padStart(COL_DIST),
);
}
console.log(`\nTotal processed scans: ${processedRecords.length} of ${state.records.length} gathered (${state.records.length - processedRecords.length} rejected/skipped)`);
console.log(`Journeys reconstructed: ${state.journeys.size}`);
// ── (b2) ROUTING SAVINGS VIEW (the thesis made tangible — §B0.7c) ─────────────
// Each clone recorded its own RAN/SKIPPED decisions on the enriched record; the
// parent totals them here and compares the actual node-executions against the
// naive "always-run every node for every event" maximum.
//
// Node cost model (the nodes a branch runs/skips per event):
// geo-lookup chain : validate-coords + geo-grid + geo-context = 3 nodes
// (skip path runs apply-geo = 1 node → 2 avoided per skip)
// order enrichment : enrich-pricing + enrich-shipping + enrich-eta = 3 nodes
// redaction sub-DAG: consent-gate + classify-pii + redact-pii = 3 nodes
// (skip path bypasses all 3 directly → 3 avoided)
const GEO_CHAIN_NODES = 3; // validate-coords, geo-grid, geo-context
const GEO_SKIP_ADAPTER = 1; // apply-geo
const ORDER_ENRICH_NODES = 3; // pricing, shipping, eta
const REDACTION_NODES = 3; // consent-gate, classify-pii, redact-pii
const REDACTION_SKIP_ADAPTER = 0; // no intermediate node on skip path
let geoRun = 0, geoSkip = 0, redRun = 0, redSkip = 0, priceSkip = 0, etaSkip = 0;
let coldRun = 0, customsRun = 0;
// Geo modality accounting: reverse-geocode is offline (free); the avoidable REAL
// calls are IP geolocations (freeipapi.com).
let revgeoRun = 0, ipgeoRun = 0, ipgeoSkip = 0, fusedGpsIp = 0;
let actualNodes = 0, naiveNodes = 0;
const pathCounts = new Map<string, number>();
for (const r of processedRecords) {
const rt = r.routing;
if (rt.geoLookupRun) geoRun++;
if (rt.geoLookupSkipped) geoSkip++;
if (rt.reverseGeocodeRun) revgeoRun++;
if (rt.ipGeolocateRun) ipgeoRun++;
if (rt.ipGeolocateSkipped) ipgeoSkip++;
if (rt.geoModalities.includes('gps') && rt.geoModalities.includes('ip')) fusedGpsIp++;
if (rt.redactionRun) redRun++;
if (rt.redactionSkipped) redSkip++;
if (rt.pricingSkipped) priceSkip++;
if (rt.etaSkipped) etaSkip++;
if (rt.coldChainRun) coldRun++;
if (rt.customsDwellRun) customsRun++;
pathCounts.set(rt.path, (pathCounts.get(rt.path) ?? 0) + 1);
// Naive maximum: every event runs every branch's nodes.
naiveNodes += GEO_CHAIN_NODES + ORDER_ENRICH_NODES + REDACTION_NODES;
// Actual: only what this event's routing ran.
actualNodes += rt.geoLookupRun ? GEO_CHAIN_NODES : GEO_SKIP_ADAPTER;
if (rt.pricingRun) actualNodes += ORDER_ENRICH_NODES;
actualNodes += rt.redactionRun ? REDACTION_NODES : REDACTION_SKIP_ADAPTER;
}
const total = processedRecords.length;
const pct = (n: number, base: number): string => base > 0 ? `${Math.round((n / base) * 100)}%` : '0%';
const skippedNodes = naiveNodes - actualNodes;
const redactionPassesAvoided = redSkip;
const pricingEtaAvoided = priceSkip * ORDER_ENRICH_NODES;
// REAL API calls avoided: reverse-geocode is now OFFLINE/FREE (no call to avoid);
// the avoidable real calls are IP geolocations (freeipapi.com). ip-geolocate runs
// only when a gateway IP is present and the geo sub-DAG isn't skipped. Caching
// collapses repeated IPs, so unique calls are far fewer than the per-event count.
const ipGeolocateAvoided = geoSkip + ipgeoSkip; // skipped sub-DAG + GPS-only signals
console.log('\n=== (b2) Routing Savings — deterministic routing skips REAL API calls ===\n');
console.log(` HEADLINE: deterministic routing skipped ${skippedNodes.toLocaleString('en-US')} node-executions ` +
`(~${pct(skippedNodes, naiveNodes)} of the ${naiveNodes.toLocaleString('en-US')} always-run maximum).\n`);
console.log(' Geo resolution (the real-world win — don\'t hammer the API):');
console.log(` • reverse-geocode (offline country-coder, no network): RESOLVED ${revgeoRun} events · 0 API calls (deterministic, free, no key)`);
console.log(` • ip-geolocate (freeipapi.com, REAL API): RAN for ${ipgeoRun} events · AVOIDED ${ipGeolocateAvoided} (pre-resolved or no gateway IP)`);
console.log(` • caching collapses repeated IPs → the actual UNIQUE upstream IP calls are far fewer (per-IP cache).`);
console.log(` • multi-modal fusion: ${fusedGpsIp} events fused GPS+IP (agreement → high confidence); the rest are GPS-only.`);
console.log('');
console.log(` geo-resolve: RAN ${geoRun} · SKIPPED ${geoSkip} (${pct(geoSkip, total)} — source already resolved → geo sub-DAG + IP call avoided)`);
console.log(` redaction: RAN ${redRun} · SKIPPED ${redSkip} (${pct(redSkip, total)} — no PII / not required → redaction sub-DAG bypassed)`);
console.log(` pricing+eta: RAN ${total - priceSkip} · SKIPPED ${priceSkip} (${pct(priceSkip, total)} — non-order kinds carry no basket/delivery)`);
console.log(` per-kind lanes: ${[...pathCounts.entries()].sort().map(([p, n]) => `${p}=${n}`).join(' ')}`);
console.log(` cold-chain-check RAN ${coldRun} (sensor lane only) · customs-dwell RAN ${customsRun} (customs lane only)`);
console.log('\n Compute avoided (beyond API calls):');
console.log(` • ${redactionPassesAvoided.toLocaleString('en-US')} redaction passes avoided — skip hashing/coarsening when there is no PII to protect.`);
console.log(` • ${pricingEtaAvoided.toLocaleString('en-US')} pricing/shipping/ETA node-executions avoided — don't price a position ping.`);
// ── (c) Per-journey summaries (a few) ─────────────────────────────────────────
function printJourney(j: JourneyInsights): void {
const km = Math.round(j.pathKm).toLocaleString('en-US');
const elapsedH = Math.floor(j.elapsedHours);
const elapsedM = Math.round((j.elapsedHours - elapsedH) * 60);
console.log(`${j.shipmentId} (${j.scanCount} scans, ${j.timezones.length} timezone(s))`);
for (const s of j.scans) {
const time = s.localIso.slice(11, 16);
const cum = `+${Math.round(s.legKm).toLocaleString('en-US')} km`;
console.log(
` ${time} ${s.utcOffset.padEnd(7)} ${s.eventType.padEnd(16)} ` +
`${s.hub.slice(0, 18).padEnd(19)} ${fmtCoord(s.lat, s.lng).padEnd(20)} ${cum}`,
);
}
const tzCrossings = Math.max(0, j.offsets.length - 1);
const jurisLabel = j.jurisdictions.length > 1 ? `${j.jurisdictions.join('→')}` : j.jurisdictions[0] ?? 'baseline';
const otLabel = j.delivered ? (j.onTime ? 'on-time' : `late ${j.delayHours}h`) : `in transit (${j.lastStatus})`;
console.log(` journey: ${km} km · ${elapsedH}h${String(elapsedM).padStart(2, '0')}m elapsed · ${tzCrossings} tz crossing(s) · jurisdiction ${jurisLabel} · ${otLabel}`);
}
console.log('\n=== (c) Per-Journey Summaries ===\n');
const journeysSorted = [...state.journeys.values()].sort((a, b) => b.scanCount - a.scanCount);
// Show a few: one multi-tz, one multi-jurisdiction, one delivered.
const shown = new Set<string>();
const picks: JourneyInsights[] = [];
const multiTz = journeysSorted.find((j) => j.offsets.length >= 2);
if (multiTz !== undefined) { picks.push(multiTz); shown.add(multiTz.shipmentId); }
const multiJuris = journeysSorted.find((j) => j.jurisdictions.length >= 2 && !shown.has(j.shipmentId));
if (multiJuris !== undefined) { picks.push(multiJuris); shown.add(multiJuris.shipmentId); }
const deliveredJourney = journeysSorted.find((j) => j.delivered && !shown.has(j.shipmentId));
if (deliveredJourney !== undefined) { picks.push(deliveredJourney); shown.add(deliveredJourney.shipmentId); }
for (const j of picks) {
printJourney(j);
console.log('');
}
const tzCrossingJourneys = [...state.journeys.values()].filter((j) => j.offsets.length >= 2).length;
const jurisChangeJourneys = [...state.journeys.values()].filter((j) => j.jurisdictions.length >= 2).length;
console.log(`Journeys crossing >=2 timezones: ${tzCrossingJourneys}`);
console.log(`Journeys changing jurisdiction mid-path: ${jurisChangeJourneys}`);
// ── (d) Location-driven redaction comparison ──────────────────────────────────
function printRedaction(label: string, rec: EnrichedShipment): void {
console.log(` [${label}] ${rec.shipmentId} jurisdiction=${rec.jurisdiction} consent=${rec.consentStatus}`);
console.log(` Name: ${rec.redactedSample.recipientName}`);
console.log(` Email: ${rec.redactedSample.recipientEmail}`);
console.log(` Phone: ${rec.redactedSample.recipientPhone}`);
console.log(` Coords: ${fmtCoord(rec.lat, rec.lng)} ${rec.coordsCoarsened ? '(COARSENED to grid centroid)' : '(precise)'}`);
}
const strictRecord = processedRecords.find(
(r) => r.coordsCoarsened && (r.jurisdiction === 'GDPR' || r.jurisdiction === 'UK-GDPR' || r.jurisdiction === 'LGPD'),
) ?? processedRecords.find((r) => r.coordsCoarsened);
const baselineRecord = processedRecords.find(
(r) => !r.coordsCoarsened && r.jurisdiction === 'baseline' && r.consentStatus === 'valid',
) ?? processedRecords.find((r) => !r.coordsCoarsened);
console.log('\n=== (d) Location-Driven Redaction (strict vs baseline) ===\n');
if (strictRecord !== undefined) printRedaction('strict', strictRecord);
if (baselineRecord !== undefined) {
console.log('');
printRedaction('baseline', baselineRecord);
}
console.log(`\nDone. ${state.insights.size} continent(s), ${state.journeys.size} journey(s). No Date.now. No Math.random.\n`);