Curiosity

Pipeline Orchestration

Connectors run on a schedule. This page is about how — Airflow, GitHub Actions, cron, or the workspace's built-in scheduler — and the failure modes you need to plan for.

For the connector code itself see Connector templates. For the conceptual end-to-end flow see Ingestion pipelines.

End-to-end pipeline

flowchart LR src[(Source)] --> ext[Extract] ext --> tr[Transform / map] tr --> load[Load → Graph] load --> enr[Enrich / NLP] enr --> val[Validate] val -->|ok| done([Run complete]) val -->|drift| alert[Alert + halt] cp[(Checkpoint)] --> ext ext --> cp

Each stage is a separate failure boundary. The orchestrator's job is to retry, alert, and persist the checkpoint between runs.

Orchestrators

Apache Airflow

Best when you have multi-source pipelines, dependencies between connectors, and existing Airflow infrastructure.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner":            "data-eng",
    "retries":          3,
    "retry_delay":      timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay":  timedelta(minutes=30),
}

with DAG(
    dag_id="curiosity_jira_sync",
    start_date=datetime(2026, 1, 1),
    schedule_interval="*/15 * * * *",      # every 15 min
    catchup=False,
    default_args=default_args,
    max_active_runs=1,                     # don't overlap runs
) as dag:

    sync = BashOperator(
        task_id="jira_sync",
        bash_command="python /opt/connectors/jira/sync.py",
        env={"CURIOSITY_TOKEN": "{{ var.value.curiosity_token }}"},
    )

Conventions that work:

  • max_active_runs=1 to avoid two connector processes racing on the same checkpoint.
  • retries with exponential backoff for transient API failures.
  • on_failure_callback posting to your incident channel.
  • A separate DAG per source, not one DAG with many connectors — failures are independent.

GitHub Actions

Best for low-volume / overnight sync jobs where the connector code already lives in the repo.

name: Curiosity sync
on:
  schedule:
    - cron: "0 2 * * *"          # 02:00 UTC daily
  workflow_dispatch:

concurrency:
  group: curiosity-sync          # only one job at a time
  cancel-in-progress: false

jobs:
  sync:
    runs-on: ubuntu-latest
    timeout-minutes: 60
    steps:
      - uses: actions/checkout@v4

      - name: Restore checkpoint
        uses: actions/cache@v4
        with:
          path: .checkpoint
          key:  curiosity-sync-checkpoint

      - name: Sync
        env:
          CURIOSITY_TOKEN: ${{ secrets.CURIOSITY_TOKEN }}
        run: dotnet run --project Connectors/Jira

      - name: Save checkpoint
        uses: actions/cache/save@v4
        with:
          path: .checkpoint
          key:  curiosity-sync-checkpoint-${{ github.run_id }}

Notes:

  • concurrency.group prevents overlapping runs.
  • Checkpoints in actions/cache are good enough for daily jobs — for higher cadence, persist to S3 or a workspace node.

Cron

Best when you have a single VM and don't want orchestration overhead.

*/15 * * * * /opt/curiosity/run-jira-sync.sh >> /var/log/curiosity/jira.log 2>&1

run-jira-sync.sh:

#!/usr/bin/env bash
set -euo pipefail

# single-instance lock
exec 9>/var/lock/curiosity-jira.lock
flock -n 9 || { echo "previous run still active"; exit 0; }

export CURIOSITY_TOKEN=$(cat /etc/curiosity/token)

dotnet /opt/curiosity/connectors/jira/sync.dll \
  --checkpoint /var/lib/curiosity/jira.checkpoint
  • flock prevents overlapping runs.
  • Token in /etc/curiosity/token, mode 0400, not in the script.
  • Output redirected to a log file rotated by logrotate.

Workspace built-in scheduler

For pipelines that are a single C# function and need access to the workspace's graph, write them as scheduled tasks inside the workspace. See Scheduled tasks. The scheduler handles concurrency and logging; you skip the orchestrator entirely.

Scheduling patterns

Cadence When to use
Every 5–15 minutes High-volume sources where freshness matters (chat, support tickets).
Hourly Mid-cadence sources (Jira, ServiceNow).
Daily / overnight Slow-changing sources (HR systems, CRM master data).
Event-driven When the source supports webhooks — best end-to-end latency.

Match cadence to source rate-of-change, not source size. A 10 GB master-data table that changes weekly should sync daily, not every 5 minutes.

Failure recovery

The three failure classes you actually see in production:

  1. Source unavailable. Retry with exponential backoff (start 2s, cap 30s). After N retries, alert and stop — don't burn API quota infinite-looping.
  2. Transform error. A record doesn't fit the schema. Log the record's ID, skip it, continue the run. Don't halt the whole pipeline on one bad record.
  3. Workspace load error. Authentication, schema mismatch, network. Halt, alert, don't checkpoint.

Encode these distinctly in your code:

foreach (var record in batch)
{
    try
    {
        MapToGraph(graph, record);
    }
    catch (MappingException ex)
    {
        await graph.LogAsync(LogLevel.Warning,
            $"Skipping {record.Id}: {ex.Message}");
        skipped++;
    }
}

await graph.CommitPendingAsync();   // load errors propagate up; that's intentional

Idempotency and checkpointing

Every pipeline run must be safe to re-run. The two ingredients:

  • Stable graph keys. TryAdd and AddOrUpdate are no-ops on unchanged records as long as the key is stable.
  • Persisted checkpoint. Track the cursor (last updated_at, last page token, last commit hash) somewhere durable — disk, S3, a workspace node. Update only after the commit succeeds.

A run that crashes mid-flight should resume from the last checkpoint and re-process the in-flight window. As long as keys are stable, duplicates are impossible.

Environment separation

Use three workspaces — dev, staging, prod — and three tokens, one per environment. The same connector binary runs against all three; only the env vars change.

CURIOSITY_URL=https://dev.example.com    CURIOSITY_TOKEN=… dotnet run     # dev
CURIOSITY_URL=https://staging.example.com CURIOSITY_TOKEN=… dotnet run     # staging
CURIOSITY_URL=https://prod.example.com    CURIOSITY_TOKEN=… dotnet run     # prod

Promote schema changes by exporting from staging and importing to prod (ExportWorkspaceDefinitionAsync / ImportWorkspaceDefinitionsAsync). See Upgrades and migrations.

Monitoring and alerting

Wire the workspace's metrics endpoints into your monitoring stack. The signals worth alerting on:

Alert Trigger
Pipeline run failed Orchestrator returned non-zero exit status.
Pipeline running longer than P95 × 2 Slow source, slow workspace, or hanging request.
Ingestion volume drops > 50% vs. day-over-day Source-side change, auth expiry, or filter regression.
Connector-side error rate > 1% Mapping or auth issue.
Schema validation drift Source-side field renamed; map needs update.

See Administration → Monitoring for built-in dashboards and Metrics reference for the workspace's metric routes.

Secrets

Tokens, API keys, certificate paths — all in a secret manager (Vault, AWS Secrets Manager, GitHub Encrypted Secrets). Never in source. The orchestrator injects them as env vars at run time. Rotate quarterly; the workspace supports overlapping tokens during rotation.

Best practices

  • One DAG / workflow per source. Failures are independent; debugging is easier.
  • Always checkpoint after a successful commit, not before. Crashes mid-commit re-process the same window — that's fine, as long as your keys are stable.
  • Incremental over full. Full re-syncs are escape hatches, not the default.
  • Run validation as a separate stage. A run that "succeeded" but ingested 0 records should alert.
  • Log to the workspace. graph.LogAsync(...) makes pipeline events visible in the admin UI alongside system events. See Monitoring.

Where to go next

© 2026 Curiosity. All rights reserved.
Powered by Neko