Technical Support: Data Ingestion
This page is a complete walkthrough of the technical-support connector — schema bootstrapping, idempotent upserts, ACL-aware ingestion, and incremental sync with checkpointing. The patterns here are reusable; the dataset isn't the point.
Architecture at a glance
The four moving parts:
- Schema bootstrap — declared once at startup, idempotent on the workspace.
- Fetcher — talks to the source, paginated, retried.
- Mapper — pure function: record → graph upserts. Easy to unit test.
- Loop driver — owns the cursor, owns the checkpoint, restartable.
Step 1: connect and bootstrap the schema
using Curiosity.Library;
using TechnicalSupport.Schema;
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "technical-support");
await graph.CreateNodeSchemaAsync<Nodes.Device>();
await graph.CreateNodeSchemaAsync<Nodes.Part>();
await graph.CreateNodeSchemaAsync<Nodes.Manufacturer>();
await graph.CreateNodeSchemaAsync<Nodes.Status>();
await graph.CreateNodeSchemaAsync<Nodes.SupportCase>();
await graph.CreateNodeSchemaAsync<Nodes.SupportCaseMessage>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));
CreateNodeSchemaAsync is idempotent — running it on a workspace that already has the schema is a no-op. Always call it at startup so deployments don't drift.
Step 2: define the pure mapping function
The mapping function is the bit you'll change most often. Keep it free of I/O so unit tests don't need a workspace.
static void MapCase(SafeGraph graph, SourceCase src, Node statusNode, Node? deviceNode)
{
var caseNode = graph.AddOrUpdate(new Nodes.SupportCase
{
Id = src.ReferenceNumber,
Summary = src.Summary,
Content = src.Content,
Time = src.OpenedAt,
});
graph.Link(caseNode, statusNode, Edges.HasStatus, Edges.OfStatus);
if (deviceNode is not null)
graph.Link(caseNode, deviceNode, Edges.ForDevice, Edges.HasSupportCase);
var i = 0;
foreach (var message in src.Messages)
{
var msgNode = graph.AddOrUpdate(new Nodes.SupportCaseMessage
{
Id = $"{src.ReferenceNumber}#{i++}", // stable, hash-free key
Body = message.Body,
Time = message.Time,
});
graph.Link(caseNode, msgNode, Edges.HasMessage, Edges.OfCase);
}
}
The key choices here:
AddOrUpdatekeeps the case'sSummary/Contentin sync if the source mutates them.Linktakes both edge names so traversal works from either end.Id = $"{src.ReferenceNumber}#{i}"makes the message node deterministic across re-runs.
Step 3: ingest ACLs alongside the data
If the source has permissions, mirror them at ingest time — bolting ACLs on later means a window where data is searchable but unprotected.
// One-time team setup (idempotent).
var supportTeam = await graph.CreateTeamAsync("Tier-2 Support");
foreach (var member in src.AssignedAgents)
{
var user = await graph.CreateUserAsync(member.Login, member.Email, member.FirstName, member.LastName);
graph.AddUserToTeam(user, supportTeam);
}
// Mirror source ACL onto the case.
graph.RestrictAccessToTeam(caseNode, supportTeam);
foreach (var watcher in src.IndividualWatchers)
{
var watcherUser = await graph.CreateUserAsync(watcher.Login, watcher.Email, watcher.FirstName, watcher.LastName);
graph.RestrictAccessToUser(caseNode, watcherUser);
}
After this:
- A query made through
Graph.CreateSearchAsUserAsync(request, userUID)will only return cases the user has access to. - Direct admin queries (
Graph.CreateSearchAsync(request)) still see everything — that's intentional for batch / debugging code.
See Permission-aware search for the query-time half of this pattern.
Step 4: drive the loop with checkpointing
The cursor is the single source of truth for "how far did the connector get". Persist it outside process memory so a crash doesn't replay every record.
DateTimeOffset lastSync = await ReadCheckpointAsync() ?? DateTimeOffset.MinValue;
while (true)
{
var batch = await source.FetchChangedAsync(since: lastSync, pageSize: 500);
if (batch.Count == 0) break;
foreach (var record in batch)
{
var status = graph.AddOrUpdate(new Nodes.Status { Name = record.Status });
var device = record.Device is null ? null
: graph.AddOrUpdate(new Nodes.Device { Name = record.Device });
MapCase(graph, record, status, device);
}
await graph.CommitPendingAsync();
lastSync = batch.Max(r => r.UpdatedAt);
await WriteCheckpointAsync(lastSync);
}
Two non-obvious details:
- Commit before checkpoint.
CommitPendingAsyncmust finish before you persist the new cursor — otherwise a crash between them re-ingests safely (idempotent), but if you reverse the order a crash can advance the cursor past data that never landed. - Source ordering matters. The source must guarantee that records returned with
updated_at <= lastSyncare stable. If it doesn't (e.g. servers with clock skew), back the cursor up by a small grace period.
Step 5: tune for backfills and high-volume runs
For an initial backfill, the per-batch commit pattern above is too chatty. Two knobs from Curiosity.Library:
graph.PauseIndexing("initial-backfill");
graph.SetAutoCommitCost(everyNodes: 10_000);
try
{
await IngestEverythingAsync(graph);
}
finally
{
await graph.CommitPendingAsync();
graph.ResumeIndexing("initial-backfill");
}
What changes:
PauseIndexingdefers search-index updates until you resume — the workspace rebuilds once at the end instead of after every batch.SetAutoCommitCostlets the library decide when to flush instead of you doing it per batch.
Don't leave indexing paused on a long-running connector — search results go stale until you ResumeIndexing. Wrap the call in try/finally so a thrown exception still resumes.
Step 6: validate after the run
A quick set of asserts catches the common ingestion bugs — duplicates, missing edges, orphan nodes.
// Counts
var caseCount = Q().StartAt(nameof(Nodes.SupportCase)).Count();
var deviceCount = Q().StartAt(nameof(Nodes.Device)).Count();
// Orphan check: every case must have exactly one status
var orphans = Q()
.StartAt(nameof(Nodes.SupportCase))
.Where(c => c.Out(Edges.HasStatus).Count() == 0)
.Take(10)
.Emit();
For longer-running connectors, expose these as a scheduled task that writes the result to a log node so you can dashboard it.
What to take away for production projects
- Stable keys, mapped from real source identifiers — no GUIDs.
- Explicit edges, declared with both directions.
- ACLs at ingest time, mirrored from the source.
- Checkpoint discipline — commit before persisting the cursor, never the other way around.
- Pause indexing for backfills, resume after.
Cross-links to product docs
- Connector templates: Connector templates
- Pipelines and orchestration: Ingestion pipelines
- Access model: Access control
- Query-time enforcement: Permission-aware search