MongoDB recipe
Source: MongoSample/ · MongoDB collection (BSON documents). Optionally tails a change stream for near-realtime updates. Local JSON fallback for offline development.
Owns in the academic graph: student profiles, internships, projects, companies, interests.
What it teaches
- Schemaless → typed — BSON to relaxed-extended JSON to a C# POCO.
- Snapshot + tail pattern — initial backfill, then optionally follow the change stream.
- Nested-array projection —
internships[],projects[]from the document fan out into typed nodes and edges. - Idempotent re-ingestion via
AddOrUpdatekeyed bystudentId.
Snapshot + change-stream loop
// 1. Initial snapshot.
var initial = 0;
await foreach (var profile in source.StreamAsync<ProfilesIngest.ProfileDoc>(collectionName))
{
ProfilesIngest.Ingest(graph, profile);
initial++;
}
await graph.CommitPendingAsync();
logger.LogInformation("Ingested {Count} profiles from initial snapshot", initial);
// 2. Optional follow.
if (followChanges && source is MongoSource liveSource)
{
logger.LogInformation("Following change stream — Ctrl+C to stop");
var changed = 0;
await foreach (var profile in liveSource.StreamChangesAsync<ProfilesIngest.ProfileDoc>(collectionName, cts.Token))
{
ProfilesIngest.Ingest(graph, profile);
if (++changed % 50 == 0) await graph.CommitPendingAsync();
}
await graph.CommitPendingAsync();
}
Nested-array projection
foreach (var ship in doc.Internships)
{
if (string.IsNullOrWhiteSpace(ship.Company)) continue;
var company = graph.AddOrUpdate(new Nodes.Company
{
Name = ship.Company,
Industry = ship.Industry,
Website = ship.Website,
});
var internship = graph.AddOrUpdate(new Nodes.Internship
{
Id = $"{doc.StudentId}/{ship.Company}/{ship.StartYear}",
Role = ship.Role,
StartYear = ship.StartYear,
EndYear = ship.EndYear,
});
graph.Link(student, internship, Edges.Held, Edges.HeldBy);
graph.Link(internship, company, Edges.At, Edges.Hosted);
}
Configuration
| Variable | Purpose | Default |
|---|---|---|
RECIPE_MONGO_URI |
e.g. mongodb://localhost:27017 (blank → local mode) |
(blank) |
RECIPE_MONGO_DB |
Database | students |
RECIPE_MONGO_COLLECTION |
Collection | profiles |
RECIPE_FOLLOW_CHANGES |
Set to 1 to tail change streams |
(off) |
Reuse notes
- Change streams need a replica set or sharded cluster — not available on standalone Mongo.
- Persist the resume token after each commit for production-grade resumability.
- Schema drift is silent — validate critical fields in
Ingest, not in deserialization.