Curiosity

Ingestion Pipelines

An ingestion pipeline is the operational workflow that keeps your workspace data correct over time. A pipeline is made up of one or more connectors (the code), scheduled tasks (the trigger), and enrichment steps (NLP, embeddings, derived edges).

This page covers the lifecycle, the standard templates you can adapt, and the operational concerns that come with running pipelines in production.

Lifecycle

flowchart LR Design[Design schema] --> Initial[Initial load] Initial --> Validate[Validate counts and edges] Validate --> Incremental[Incremental sync] Incremental --> Enrich[Enrichment] Enrich --> Incremental Incremental --> Reconcile[Periodic reconciliation] Reconcile --> Incremental

A pipeline is never "done" — it has a steady state of incremental syncs punctuated by periodic reconciliation.

Standard templates

Full load

When: one-time bootstrap, small datasets that fit in memory, rebuilds after schema changes.

Shape:

await foreach (var row in source.StreamAllAsync())
{
var node = graph.TryAdd(/* ... */);
if (++count % 500 == 0) await graph.CommitPendingAsync();
}
await graph.CommitPendingAsync();

Risks: expensive on large sources; can produce duplicates if the source emits the same record twice. Always run with stable keys.

Incremental sync (watermark-based)

When: most production pipelines.

Shape:

var since = await state.LoadCursorAsync();
var next  = DateTimeOffset.UtcNow;

await foreach (var row in source.StreamSinceAsync(since))
{
graph.TryAdd(/* ... */);
if (++count % 500 == 0) await graph.CommitPendingAsync();
}
await graph.CommitPendingAsync();
await state.SaveCursorAsync(next);

Notes: the cursor advances only on success. Save state at the end of the run, not in the middle.

Change-feed / webhook

When: near-real-time requirements, source supports a change feed.

Shape: a long-running worker that consumes events, upserts nodes, and commits every N events or every M seconds — whichever comes first. Idempotent writers handle out-of-order delivery.

Delete sync (reconciliation)

When: deletes in source matter (compliance, off-boarding, archival).

Shape: periodic scan that compares source primary keys to graph nodes; soft- or hard-deletes the difference. Run hourly/daily; less frequently if your deletes are rare.

var sourceKeys = new HashSet<string>(await source.ListAllKeysAsync());
await foreach (var node in graph.AllOfTypeAsync("Ticket"))
{
var id = node.GetString("Id");
if (!sourceKeys.Contains(id))
graph.RemoveNode(node.UID);
}
await graph.CommitPendingAsync();
Backfill

When: filling in newly added properties, re-embedding after a model switch, applying NLP to historical content.

Shape: a scheduled task that walks the graph in pages, updates each affected node, and commits. Bounded by Take() and a stored progress cursor so multiple runs cover the whole graph without overlap.

Enrichment

When: derived signals — entity links, similarity edges, computed aggregates.

Shape: a scheduled task that walks recent (or all) nodes, applies a deterministic transformation, and writes derived edges or properties.

foreach (var ticket in await Q().StartAt("Ticket").Take(1000).EmitAsync("N"))
{
var entities = await NLP.ExtractAsync(ticket.GetString("Body"));
foreach (var e in entities)
{
var entityNode = graph.TryAdd(new Entity { Name = e.Canonical });
graph.Link(ticket, entityNode, "MentionsEntity");
}
}
await graph.CommitPendingAsync();

Triggers and scheduling

Trigger Latency Operational cost Best for
Cron / scheduled task Minutes to hours Lowest Most pipelines
External orchestrator (Airflow, Prefect, GitHub Actions) Same as cron Adds an external system Multi-stage DAGs that span systems
Webhook / event stream Seconds Highest (always-on worker) Near-real-time UX
Manual n/a None One-off imports

For schedule-based ingestion that lives inside the workspace, see Scheduled Tasks. For external orchestrators, see Pipeline Orchestration.

Idempotency: the cardinal rule

A pipeline must be safe to run twice. Concretely:

  • Re-running over the same window must not change node or edge counts.
  • A failure mid-run must not corrupt the cursor.
  • A duplicate input row must not produce two nodes.

The cheap test: run the connector twice in a row against the same source state. Count nodes and edges before and after. Differences mean the pipeline is not idempotent.

Retries and dead letters

Transient failures (network, rate limits, brief source unavailability) should retry with exponential backoff. Persistent failures (malformed records, schema mismatches) should land in a dead-letter queue, not crash the run.

async Task<T> WithRetry<T>(Func<Task<T>> op)
{
    var delay = TimeSpan.FromSeconds(2);
    for (var attempt = 1; ; attempt++)
    {
        try { return await op(); }
        catch (Exception ex) when (attempt < 5 && IsTransient(ex))
        {
            await Task.Delay(delay + TimeSpan.FromMilliseconds(Random.Shared.Next(0, 500)));
            delay = TimeSpan.FromSeconds(Math.Min(delay.TotalSeconds * 2, 60));
        }
    }
}

For dead letters: keep a side store of records that failed N times. Surface a metric and a daily report so they get attention.

Observability

Every pipeline run should emit:

  • start/end timestamps and duration;
  • counts processed, written, skipped, failed;
  • per-stage timings (read, transform, commit);
  • the resulting cursor;
  • the top N error reasons.

Forward these to your monitoring stack. The workspace's Monitoring page has the built-in dashboards; extend them with your own metrics endpoint if your pipelines run outside the workspace.

Secrets

All credentials — source-side API keys, the Curiosity ingestion token — live in a secret manager. Pipelines read them from env vars at startup. Never commit them to source. See Token scopes and Security.

Promoting through environments

Same as application code:

  1. Develop against a local Workspace.
  2. Validate in staging with a representative subset of source data.
  3. Promote to production after counts, latencies, and error rates look right in staging.
  4. Roll back by reverting the connector binary and replaying from a cursor before the bad run.

See Deployment.

Operational checklist

  • Pipeline is idempotent (verified by running twice).
  • Cursor advances forward only and survives a mid-run crash.
  • Deletes in source are reflected in graph within one reconciliation cycle.
  • Retries on transient failures; dead-letter for persistent failures.
  • Metrics: count, duration, error rate, cursor lag.
  • Alerts: ingestion failure rate > 5%, cursor lag > expected SLA.
  • Secrets in a secret manager, not in source.
  • Connector binary versioned and pinned in CI.
  • Backup tested before any breaking schema change.

Next steps

© 2026 Curiosity. All rights reserved.
Powered by Neko