Curiosity

Technical Support: Data Ingestion

This page is a complete walkthrough of the technical-support connector — schema bootstrapping, idempotent upserts, ACL-aware ingestion, and incremental sync with checkpointing. The patterns here are reusable; the dataset isn't the point.

Architecture at a glance

flowchart LR Source[(Source<br/>case API)] -->|FetchPage| Fetcher Fetcher --> Mapper["Map<br/>(record → node + edges)"] Mapper -->|TryAdd / AddOrUpdate / Link| Graph[(Curiosity<br/>graph)] Mapper -->|RestrictAccessTo*| Graph Loop[Cursor loop] -->|since: lastSync| Fetcher Graph -->|max(UpdatedAt)| Checkpoint[(Checkpoint store)] Checkpoint --> Loop

The four moving parts:

  1. Schema bootstrap — declared once at startup, idempotent on the workspace.
  2. Fetcher — talks to the source, paginated, retried.
  3. Mapper — pure function: record → graph upserts. Easy to unit test.
  4. Loop driver — owns the cursor, owns the checkpoint, restartable.

Step 1: connect and bootstrap the schema

using Curiosity.Library;
using TechnicalSupport.Schema;

using var graph = Graph.Connect(
    endpoint:      Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
    token:         Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
    connectorName: "technical-support");

await graph.CreateNodeSchemaAsync<Nodes.Device>();
await graph.CreateNodeSchemaAsync<Nodes.Part>();
await graph.CreateNodeSchemaAsync<Nodes.Manufacturer>();
await graph.CreateNodeSchemaAsync<Nodes.Status>();
await graph.CreateNodeSchemaAsync<Nodes.SupportCase>();
await graph.CreateNodeSchemaAsync<Nodes.SupportCaseMessage>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));

CreateNodeSchemaAsync is idempotent — running it on a workspace that already has the schema is a no-op. Always call it at startup so deployments don't drift.

Step 2: define the pure mapping function

The mapping function is the bit you'll change most often. Keep it free of I/O so unit tests don't need a workspace.

static void MapCase(SafeGraph graph, SourceCase src, Node statusNode, Node? deviceNode)
{
    var caseNode = graph.AddOrUpdate(new Nodes.SupportCase
    {
        Id      = src.ReferenceNumber,
        Summary = src.Summary,
        Content = src.Content,
        Time    = src.OpenedAt,
    });

    graph.Link(caseNode, statusNode, Edges.HasStatus, Edges.OfStatus);
    if (deviceNode is not null)
        graph.Link(caseNode, deviceNode, Edges.ForDevice, Edges.HasSupportCase);

    var i = 0;
    foreach (var message in src.Messages)
    {
        var msgNode = graph.AddOrUpdate(new Nodes.SupportCaseMessage
        {
            Id   = $"{src.ReferenceNumber}#{i++}",   // stable, hash-free key
            Body = message.Body,
            Time = message.Time,
        });
        graph.Link(caseNode, msgNode, Edges.HasMessage, Edges.OfCase);
    }
}

The key choices here:

  • AddOrUpdate keeps the case's Summary / Content in sync if the source mutates them.
  • Link takes both edge names so traversal works from either end.
  • Id = $"{src.ReferenceNumber}#{i}" makes the message node deterministic across re-runs.

Step 3: ingest ACLs alongside the data

If the source has permissions, mirror them at ingest time — bolting ACLs on later means a window where data is searchable but unprotected.

// One-time team setup (idempotent).
var supportTeam = await graph.CreateTeamAsync("Tier-2 Support");

foreach (var member in src.AssignedAgents)
{
    var user = await graph.CreateUserAsync(member.Login, member.Email, member.FirstName, member.LastName);
    graph.AddUserToTeam(user, supportTeam);
}

// Mirror source ACL onto the case.
graph.RestrictAccessToTeam(caseNode, supportTeam);
foreach (var watcher in src.IndividualWatchers)
{
    var watcherUser = await graph.CreateUserAsync(watcher.Login, watcher.Email, watcher.FirstName, watcher.LastName);
    graph.RestrictAccessToUser(caseNode, watcherUser);
}

After this:

  • A query made through Graph.CreateSearchAsUserAsync(request, userUID) will only return cases the user has access to.
  • Direct admin queries (Graph.CreateSearchAsync(request)) still see everything — that's intentional for batch / debugging code.

See Permission-aware search for the query-time half of this pattern.

Step 4: drive the loop with checkpointing

The cursor is the single source of truth for "how far did the connector get". Persist it outside process memory so a crash doesn't replay every record.

DateTimeOffset lastSync = await ReadCheckpointAsync() ?? DateTimeOffset.MinValue;

while (true)
{
    var batch = await source.FetchChangedAsync(since: lastSync, pageSize: 500);
    if (batch.Count == 0) break;

    foreach (var record in batch)
    {
        var status = graph.AddOrUpdate(new Nodes.Status   { Name = record.Status        });
        var device = record.Device is null ? null
                   : graph.AddOrUpdate(new Nodes.Device   { Name = record.Device        });
        MapCase(graph, record, status, device);
    }

    await graph.CommitPendingAsync();

    lastSync = batch.Max(r => r.UpdatedAt);
    await WriteCheckpointAsync(lastSync);
}

Two non-obvious details:

  • Commit before checkpoint. CommitPendingAsync must finish before you persist the new cursor — otherwise a crash between them re-ingests safely (idempotent), but if you reverse the order a crash can advance the cursor past data that never landed.
  • Source ordering matters. The source must guarantee that records returned with updated_at <= lastSync are stable. If it doesn't (e.g. servers with clock skew), back the cursor up by a small grace period.

Step 5: tune for backfills and high-volume runs

For an initial backfill, the per-batch commit pattern above is too chatty. Two knobs from Curiosity.Library:

graph.PauseIndexing("initial-backfill");
graph.SetAutoCommitCost(everyNodes: 10_000);

try
{
    await IngestEverythingAsync(graph);
}
finally
{
    await graph.CommitPendingAsync();
    graph.ResumeIndexing("initial-backfill");
}

What changes:

  • PauseIndexing defers search-index updates until you resume — the workspace rebuilds once at the end instead of after every batch.
  • SetAutoCommitCost lets the library decide when to flush instead of you doing it per batch.

Don't leave indexing paused on a long-running connector — search results go stale until you ResumeIndexing. Wrap the call in try/finally so a thrown exception still resumes.

Step 6: validate after the run

A quick set of asserts catches the common ingestion bugs — duplicates, missing edges, orphan nodes.

// Counts
var caseCount   = Q().StartAt(nameof(Nodes.SupportCase)).Count();
var deviceCount = Q().StartAt(nameof(Nodes.Device)).Count();

// Orphan check: every case must have exactly one status
var orphans = Q()
    .StartAt(nameof(Nodes.SupportCase))
    .Where(c => c.Out(Edges.HasStatus).Count() == 0)
    .Take(10)
    .Emit();

For longer-running connectors, expose these as a scheduled task that writes the result to a log node so you can dashboard it.

What to take away for production projects

  • Stable keys, mapped from real source identifiers — no GUIDs.
  • Explicit edges, declared with both directions.
  • ACLs at ingest time, mirrored from the source.
  • Checkpoint discipline — commit before persisting the cursor, never the other way around.
  • Pause indexing for backfills, resume after.
© 2026 Curiosity. All rights reserved.
Powered by Neko