Curiosity

Kafka / CDC recipe

Source: KafkaSample/ · Kafka topic with CDC-style JSON events, plus a JSONL fallback for offline development.

Owns in the academic graph: live enrollment events, course/term/student nodes.

What it teaches

The three pillars of an idempotent stream consumer:

  1. Deterministic composite keys — same event always yields the same node.
  2. AddOrUpdate with last-write-wins on mutable properties (status, grade).
  3. Offset advancement after commit — never the other way around, so retries always replay safely.
sequenceDiagram participant K as Kafka topic participant C as Consumer participant G as Workspace graph participant O as Offset store loop per event K->>C: event(studentId, courseCode, term, status, occurredAt) C->>G: AddOrUpdate(Enrollment by composite key) C->>G: Link(student → enrollment → course → term) end C->>G: CommitPendingAsync() G-->>C: success C->>O: Advance partition offset

Composite key + idempotent ingest

public sealed class EnrollmentEvent
{
    [JsonPropertyName("studentId")]  public string StudentId  { get; set; } = string.Empty;
    [JsonPropertyName("courseCode")] public string CourseCode { get; set; } = string.Empty;
    [JsonPropertyName("term")]       public string Term       { get; set; } = string.Empty;
    [JsonPropertyName("status")]     public string Status     { get; set; } = string.Empty;
    [JsonPropertyName("occurredAt")] public DateTimeOffset OccurredAt { get; set; }
}

public static string ComposeKey(EnrollmentEvent evt)
    => $"{evt.StudentId}/{evt.CourseCode}/{evt.Term}";

public static void Ingest(Graph graph, EnrollmentEvent evt)
{
    var key = ComposeKey(evt);
    var enrollment = graph.AddOrUpdate(new Nodes.Enrollment
    {
        Id        = key,
        Status    = evt.Status,
        Grade     = evt.Grade,
        UpdatedAt = evt.OccurredAt,  // last-write-wins on properties
    });

    var student = graph.TryAdd(new Nodes.Student { Id = evt.StudentId });
    graph.Link(student, enrollment, Edges.HasEnrollment, Edges.EnrolledIn);
}

Configuration

Variable Purpose Default
RECIPE_KAFKA_BOOTSTRAP e.g. localhost:9092 (blank → JSONL mode) (blank)
RECIPE_KAFKA_GROUP Consumer group ID curiosity-enrollments
RECIPE_KAFKA_TOPIC Topic to consume enrollments
RECIPE_MAX_IDLE_SECONDS Idle timeout for graceful shutdown 5
RECIPE_EVENTS_PATH JSONL fallback file data/events.jsonl
RECIPE_OFFSET_PATH Local offset store (JSONL mode) data/.offset

Reuse notes

  • The recipe does not handle tombstones (hard deletes). Add a branch if (evt is null) if your CDC stream uses them.
  • Kafka partition ordering plus idempotent composite keys ensure out-of-order delivery still converges to the right state.
  • Always commit the graph before advancing the offset — at-least-once + idempotency = exactly-once-in-the-graph.

Referenced by