Pipeline Orchestration
Connectors run on a schedule. This page is about how — Airflow, GitHub Actions, cron, or the workspace's built-in scheduler — and the failure modes you need to plan for.
For the connector code itself see Connector templates. For the conceptual end-to-end flow see Ingestion pipelines.
End-to-end pipeline
Each stage is a separate failure boundary. The orchestrator's job is to retry, alert, and persist the checkpoint between runs.
Orchestrators
Apache Airflow
Best when you have multi-source pipelines, dependencies between connectors, and existing Airflow infrastructure.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-eng",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="curiosity_jira_sync",
start_date=datetime(2026, 1, 1),
schedule_interval="*/15 * * * *", # every 15 min
catchup=False,
default_args=default_args,
max_active_runs=1, # don't overlap runs
) as dag:
sync = BashOperator(
task_id="jira_sync",
bash_command="python /opt/connectors/jira/sync.py",
env={"CURIOSITY_TOKEN": "{{ var.value.curiosity_token }}"},
)
Conventions that work:
max_active_runs=1to avoid two connector processes racing on the same checkpoint.retrieswith exponential backoff for transient API failures.on_failure_callbackposting to your incident channel.- A separate DAG per source, not one DAG with many connectors — failures are independent.
GitHub Actions
Best for low-volume / overnight sync jobs where the connector code already lives in the repo.
name: Curiosity sync
on:
schedule:
- cron: "0 2 * * *" # 02:00 UTC daily
workflow_dispatch:
concurrency:
group: curiosity-sync # only one job at a time
cancel-in-progress: false
jobs:
sync:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Restore checkpoint
uses: actions/cache@v4
with:
path: .checkpoint
key: curiosity-sync-checkpoint
- name: Sync
env:
CURIOSITY_TOKEN: ${{ secrets.CURIOSITY_TOKEN }}
run: dotnet run --project Connectors/Jira
- name: Save checkpoint
uses: actions/cache/save@v4
with:
path: .checkpoint
key: curiosity-sync-checkpoint-${{ github.run_id }}
Notes:
concurrency.groupprevents overlapping runs.- Checkpoints in
actions/cacheare good enough for daily jobs — for higher cadence, persist to S3 or a workspace node.
Cron
Best when you have a single VM and don't want orchestration overhead.
*/15 * * * * /opt/curiosity/run-jira-sync.sh >> /var/log/curiosity/jira.log 2>&1
run-jira-sync.sh:
#!/usr/bin/env bash
set -euo pipefail
# single-instance lock
exec 9>/var/lock/curiosity-jira.lock
flock -n 9 || { echo "previous run still active"; exit 0; }
export CURIOSITY_TOKEN=$(cat /etc/curiosity/token)
dotnet /opt/curiosity/connectors/jira/sync.dll \
--checkpoint /var/lib/curiosity/jira.checkpoint
flockprevents overlapping runs.- Token in
/etc/curiosity/token, mode 0400, not in the script. - Output redirected to a log file rotated by
logrotate.
Workspace built-in scheduler
For pipelines that are a single C# function and need access to the workspace's graph, write them as scheduled tasks inside the workspace. See Scheduled tasks. The scheduler handles concurrency and logging; you skip the orchestrator entirely.
Scheduling patterns
| Cadence | When to use |
|---|---|
| Every 5–15 minutes | High-volume sources where freshness matters (chat, support tickets). |
| Hourly | Mid-cadence sources (Jira, ServiceNow). |
| Daily / overnight | Slow-changing sources (HR systems, CRM master data). |
| Event-driven | When the source supports webhooks — best end-to-end latency. |
Match cadence to source rate-of-change, not source size. A 10 GB master-data table that changes weekly should sync daily, not every 5 minutes.
Failure recovery
The three failure classes you actually see in production:
- Source unavailable. Retry with exponential backoff (start 2s, cap 30s). After N retries, alert and stop — don't burn API quota infinite-looping.
- Transform error. A record doesn't fit the schema. Log the record's ID, skip it, continue the run. Don't halt the whole pipeline on one bad record.
- Workspace load error. Authentication, schema mismatch, network. Halt, alert, don't checkpoint.
Encode these distinctly in your code:
foreach (var record in batch)
{
try
{
MapToGraph(graph, record);
}
catch (MappingException ex)
{
await graph.LogAsync(LogLevel.Warning,
$"Skipping {record.Id}: {ex.Message}");
skipped++;
}
}
await graph.CommitPendingAsync(); // load errors propagate up; that's intentional
Idempotency and checkpointing
Every pipeline run must be safe to re-run. The two ingredients:
- Stable graph keys.
TryAddandAddOrUpdateare no-ops on unchanged records as long as the key is stable. - Persisted checkpoint. Track the cursor (last
updated_at, last page token, last commit hash) somewhere durable — disk, S3, a workspace node. Update only after the commit succeeds.
A run that crashes mid-flight should resume from the last checkpoint and re-process the in-flight window. As long as keys are stable, duplicates are impossible.
Environment separation
Use three workspaces — dev, staging, prod — and three tokens, one per environment. The same connector binary runs against all three; only the env vars change.
CURIOSITY_URL=https://dev.example.com CURIOSITY_TOKEN=… dotnet run # dev
CURIOSITY_URL=https://staging.example.com CURIOSITY_TOKEN=… dotnet run # staging
CURIOSITY_URL=https://prod.example.com CURIOSITY_TOKEN=… dotnet run # prod
Promote schema changes by exporting from staging and importing to prod (ExportWorkspaceDefinitionAsync / ImportWorkspaceDefinitionsAsync). See Upgrades and migrations.
Monitoring and alerting
Wire the workspace's metrics endpoints into your monitoring stack. The signals worth alerting on:
| Alert | Trigger |
|---|---|
| Pipeline run failed | Orchestrator returned non-zero exit status. |
| Pipeline running longer than P95 × 2 | Slow source, slow workspace, or hanging request. |
| Ingestion volume drops > 50% vs. day-over-day | Source-side change, auth expiry, or filter regression. |
| Connector-side error rate > 1% | Mapping or auth issue. |
| Schema validation drift | Source-side field renamed; map needs update. |
See Administration → Monitoring for built-in dashboards and Metrics reference for the workspace's metric routes.
Secrets
Tokens, API keys, certificate paths — all in a secret manager (Vault, AWS Secrets Manager, GitHub Encrypted Secrets). Never in source. The orchestrator injects them as env vars at run time. Rotate quarterly; the workspace supports overlapping tokens during rotation.
Best practices
- One DAG / workflow per source. Failures are independent; debugging is easier.
- Always checkpoint after a successful commit, not before. Crashes mid-commit re-process the same window — that's fine, as long as your keys are stable.
- Incremental over full. Full re-syncs are escape hatches, not the default.
- Run validation as a separate stage. A run that "succeeded" but ingested 0 records should alert.
- Log to the workspace.
graph.LogAsync(...)makes pipeline events visible in the admin UI alongside system events. See Monitoring.
Where to go next
- Connectors — what runs inside the loop.
- Ingestion pipelines — the conceptual model.
- Connector templates — starter projects.
- Scheduled tasks — built-in scheduling inside the workspace.
- Token scopes — credentials for connectors.