Curiosity

Python SDK reference

The curiosity package is the official Python driver for Curiosity Workspace. Use it for connectors, ingestion scripts, and any pipeline that's easier to maintain in Python than in C#. It mirrors a large subset of the C# SDK, but is synchronous (Python doesn't need the async/await overhead the C# version carries) and slightly more compact.

Install

pip install curiosity

Requires requests>=2.25 and urllib3>=1.26 (installed automatically).

Connect

from curiosity import Graph

with Graph.connect(endpoint="https://my-workspace.example.com",
                   token="…",
                   connector_name="my-connector") as graph:
    # … writes, queries, …
    graph.commit_pending()

Graph is a context manager — using with ensures the queue is flushed and the connection is released. Equivalent for token, endpoint, and connector-name semantics as the C# Graph.Connect.

Parity with the C# SDK

The table below maps each Python method to the C# equivalent. Methods marked n/a are C#-only at the time of writing.

Python C# equivalent
Graph.connect(endpoint, token, connector_name) Graph.Connect(endpoint, token, connectorName)
graph.set_auto_commit_cost(every_nodes, every_edges=0) SetAutoCommitCost(everyNodes, everyEdges)
graph.commit_pending() CommitPendingAsync()
graph.close() / with Graph.connect(...) Dispose() / using
Schemas
graph.create_edge_schema(edges) CreateEdgeSchemaAsync(params string[])
graph.create_node_schema() CreateNodeSchemaAsync(ISchema, overwrite) (fluent only)
n/a CreateNodeSchemaAsync<T>(overwrite) (declarative attributes)
n/a GetNodeSchemaAsync(nodeType)
Nodes
graph.add_or_update_by_key(node_type, key, content) AddOrUpdate(Node, object content)
graph.add_or_update(node, content) AddOrUpdate(Node, object content)
graph.try_add_by_key(node_type, key, content) TryAdd(Node, object content)
graph.try_add(node, content) TryAdd(Node, object content)
graph.update_by_key(node_type, key, content) Update(Node, object content)
graph.update(node, content) Update(Node, object content)
graph.delete_by_key(node_type, key) / graph.delete(node) Delete(Node)
n/a *WithOwnership variants
Edges
graph.link(from_node, to_node, edge_type, unique=True) Link(from, to, edgeType, unique)
graph.link_bidirect(a, b, e_ab, e_ba, unique=True) Link(a, b, edgeAtoB, edgeBtoA, unique)
graph.unlink(from_node, to_node, edge_type, unique=True) Unlink(from, to, edgeType, unique)
graph.unlink_bidirect(a, b, e_ab, e_ba, unique=True) Unlink(a, b, edgeAtoB, edgeBtoA, unique)
n/a UnlinkExcept(...)
Aliases
graph.add_alias(from_node, language, alias, ignore_case) AddAlias(from, language, alias, ignoreCase)
graph.clear_alias(from_node) ClearAliases(from)
Files
graph.upload_file_by_path(file_path, filename, source_name, owner_user_or_group=None) UploadFileAsync(filePath, fileName, sourceName, ...)
graph.upload_file(file, filename, source_name, owner_user_or_group=None) UploadFileAsync(Stream, fileName, sourceName, ...)
graph.delete_file(filename, source_name, owner_user_or_group=None) DeleteFileAsync(fileName, sourceName)
graph.delete_folder(path, source_name, owner_user_or_group=None) DeleteFolderAsync(path, sourceName)
graph.mark_file_as_private(file_node) MarkFileAsPrivate(fileNode)
n/a UploadFileAndPermissionsAsync, UploadFileWithIdentifierAsync, UploadFileToFolderAsync, TryGetFileNodeAsync, GetFileHash
Users, teams, permissions
graph.create_user(user_name, email, first_name, last_name) CreateUserAsync(...)
graph.create_team(team_name, description=None) CreateTeamAsync(...)
graph.add_user_to_team(user, team) AddUserToTeam(user, team)
graph.add_admin_to_team(user, team) AddAdminToTeam(user, team)
graph.remove_user_from_team(user, team) RemoveUserFromTeam(user, team)
graph.restrict_access_to_team(node, team) RestrictAccessToTeam(node, team)
graph.restrict_access_to_user(node, user) RestrictAccessToUser(node, user)
n/a ClearPermissions, InitializeAccessControlAsync, CacheAccessControlAsync
Embeddings
graph.add_embeddings_to_index_async(index_uid, vectors, batch_size=1000) AddEmbeddingsToIndexAsync(indexUID, vectors, batchSize)
n/a ClearEmbeddingsIndexAsync
Queries
graph.query(query: Query) -> QueryResults QueryAsync(Action<IQuery>)
Logging
graph.log(message) / graph.log_error(message) LogAsync(level, message)
n/a LogManyAsync, ListenToWebSocketMessages, PauseIndexing, ResumeIndexing, MapAsync, ExportWorkspaceDefinitionAsync, ImportWorkspaceDefinitionsAsync, UploadNewApplicationInterfaceAsync

When you need a method that exists only in C#, the typical pattern is to call the workspace's REST API directly with requests — every SDK method delegates to the same HTTP surface.

A complete example

from curiosity import Graph, Node, Query

with Graph.connect(endpoint="https://workspace.example.com",
                   token=os.environ["CURIOSITY_TOKEN"],
                   connector_name="jira-sync") as graph:

    graph.set_auto_commit_cost(every_nodes=5000, every_edges=10000)

    graph.create_edge_schema(["AssignedTo", "ForDevice"])

    for ticket in fetch_jira_tickets():
        case = graph.try_add_by_key(
            node_type="SupportCase",
            key=ticket["id"],
            content={
                "Summary": ticket["summary"],
                "Status":  ticket["status"],
                "OpenedAt": ticket["created_at"],
            },
        )

        if ticket.get("device_id"):
            device = graph.try_add_by_key("Device", ticket["device_id"], {})
            graph.link(case, device, "ForDevice")

    graph.commit_pending()

Queries

The Python query builder mirrors the C# version with snake-case names.

from curiosity import Query

q = (Query()
     .start_at("Manufacturer")
     .is_related_to_type("Device")
     .out("SupportCase")
     .sort_by_timestamp(oldest_first=False)
     .take(20)
     .emit("N"))

results = graph.query(q)
for node in results.nodes("N"):
    print(node)

Method mapping

Python C#
start_at(n) StartAt(...)
out(node_type=None, edge_type=None) Out(...)
out_many(levels, node_types, edge_types, distinct=True) OutMany(...)
similar(index, count, tolerance) StartAtSimilarText(...) (different start)
of_type(t) / of_types(ts) type filter (post-traversal)
except_type(t) / except_types(ts) type exclusion
is_related_to(n, assume_bidirectional_edges=True) IsRelatedTo(...)
is_related_to_type(n) filter by neighbor type
is_not_related_to(n, ...) / is_not_related_to_type(n) inverse of above
sort_by_timestamp(oldest_first=True) SortByTimestamp(...)
sort_by_connectivity(most_connected_first=True) SortByConnectivity(...)
skip(count) / take(count) Skip(...) / Take(...)
emit(emit_key, fields=None) Emit("N") (alias becomes emit_key)
emit_count(emit_key) aggregation
emit_with_edges(emit_key, fields=None) EmitWithEdges()

Some C# methods aren't exposed yet — StartAtSimilarText with full options, EmitSummary, EmitNeighborsSummary, FromSearch, AsUIDEnumerable. For those, call the REST search endpoint directly or use a C# connector.

Async / multi-threading

The Python client is synchronous. If you need concurrency, run multiple connector processes (one per partition) rather than threading inside a single one — the write queue is not thread-safe.

Patterns and tips

  • Always use with. A leaked Graph instance leaves uncommitted writes.
  • Auto-commit by cost. For multi-million-node jobs, set every_nodes=10_000 so the queue doesn't grow unbounded.
  • Owners on file uploads. Pass owner_user_or_group directly when uploading files; the C# API has more overloads but the single-owner path is the common case.
  • Re-runnable connectors. Always key writes on stable identifiers (try_add_by_key / add_or_update_by_key) so re-runs are idempotent.
© 2026 Curiosity. All rights reserved.
Powered by Neko