Python SDK reference
The curiosity package is the official Python driver for Curiosity Workspace. Use it for connectors, ingestion scripts, and any pipeline that's easier to maintain in Python than in C#. It mirrors a large subset of the C# SDK, but is synchronous (Python doesn't need the async/await overhead the C# version carries) and slightly more compact.
Install
pip install curiosity
Requires requests>=2.25 and urllib3>=1.26 (installed automatically).
Connect
from curiosity import Graph
with Graph.connect(endpoint="https://my-workspace.example.com",
token="…",
connector_name="my-connector") as graph:
# … writes, queries, …
graph.commit_pending()
Graph is a context manager — using with ensures the queue is flushed and the connection is released. Equivalent for token, endpoint, and connector-name semantics as the C# Graph.Connect.
Parity with the C# SDK
The table below maps each Python method to the C# equivalent. Methods marked n/a are C#-only at the time of writing.
| Python | C# equivalent |
|---|---|
Graph.connect(endpoint, token, connector_name) |
Graph.Connect(endpoint, token, connectorName) |
graph.set_auto_commit_cost(every_nodes, every_edges=0) |
SetAutoCommitCost(everyNodes, everyEdges) |
graph.commit_pending() |
CommitPendingAsync() |
graph.close() / with Graph.connect(...) |
Dispose() / using |
| Schemas | |
graph.create_edge_schema(edges) |
CreateEdgeSchemaAsync(params string[]) |
graph.create_node_schema() |
CreateNodeSchemaAsync(ISchema, overwrite) (fluent only) |
| n/a | CreateNodeSchemaAsync<T>(overwrite) (declarative attributes) |
| n/a | GetNodeSchemaAsync(nodeType) |
| Nodes | |
graph.add_or_update_by_key(node_type, key, content) |
AddOrUpdate(Node, object content) |
graph.add_or_update(node, content) |
AddOrUpdate(Node, object content) |
graph.try_add_by_key(node_type, key, content) |
TryAdd(Node, object content) |
graph.try_add(node, content) |
TryAdd(Node, object content) |
graph.update_by_key(node_type, key, content) |
Update(Node, object content) |
graph.update(node, content) |
Update(Node, object content) |
graph.delete_by_key(node_type, key) / graph.delete(node) |
Delete(Node) |
| n/a | *WithOwnership variants |
| Edges | |
graph.link(from_node, to_node, edge_type, unique=True) |
Link(from, to, edgeType, unique) |
graph.link_bidirect(a, b, e_ab, e_ba, unique=True) |
Link(a, b, edgeAtoB, edgeBtoA, unique) |
graph.unlink(from_node, to_node, edge_type, unique=True) |
Unlink(from, to, edgeType, unique) |
graph.unlink_bidirect(a, b, e_ab, e_ba, unique=True) |
Unlink(a, b, edgeAtoB, edgeBtoA, unique) |
| n/a | UnlinkExcept(...) |
| Aliases | |
graph.add_alias(from_node, language, alias, ignore_case) |
AddAlias(from, language, alias, ignoreCase) |
graph.clear_alias(from_node) |
ClearAliases(from) |
| Files | |
graph.upload_file_by_path(file_path, filename, source_name, owner_user_or_group=None) |
UploadFileAsync(filePath, fileName, sourceName, ...) |
graph.upload_file(file, filename, source_name, owner_user_or_group=None) |
UploadFileAsync(Stream, fileName, sourceName, ...) |
graph.delete_file(filename, source_name, owner_user_or_group=None) |
DeleteFileAsync(fileName, sourceName) |
graph.delete_folder(path, source_name, owner_user_or_group=None) |
DeleteFolderAsync(path, sourceName) |
graph.mark_file_as_private(file_node) |
MarkFileAsPrivate(fileNode) |
| n/a | UploadFileAndPermissionsAsync, UploadFileWithIdentifierAsync, UploadFileToFolderAsync, TryGetFileNodeAsync, GetFileHash |
| Users, teams, permissions | |
graph.create_user(user_name, email, first_name, last_name) |
CreateUserAsync(...) |
graph.create_team(team_name, description=None) |
CreateTeamAsync(...) |
graph.add_user_to_team(user, team) |
AddUserToTeam(user, team) |
graph.add_admin_to_team(user, team) |
AddAdminToTeam(user, team) |
graph.remove_user_from_team(user, team) |
RemoveUserFromTeam(user, team) |
graph.restrict_access_to_team(node, team) |
RestrictAccessToTeam(node, team) |
graph.restrict_access_to_user(node, user) |
RestrictAccessToUser(node, user) |
| n/a | ClearPermissions, InitializeAccessControlAsync, CacheAccessControlAsync |
| Embeddings | |
graph.add_embeddings_to_index_async(index_uid, vectors, batch_size=1000) |
AddEmbeddingsToIndexAsync(indexUID, vectors, batchSize) |
| n/a | ClearEmbeddingsIndexAsync |
| Queries | |
graph.query(query: Query) -> QueryResults |
QueryAsync(Action<IQuery>) |
| Logging | |
graph.log(message) / graph.log_error(message) |
LogAsync(level, message) |
| n/a | LogManyAsync, ListenToWebSocketMessages, PauseIndexing, ResumeIndexing, MapAsync, ExportWorkspaceDefinitionAsync, ImportWorkspaceDefinitionsAsync, UploadNewApplicationInterfaceAsync |
When you need a method that exists only in C#, the typical pattern is to call the workspace's REST API directly with requests — every SDK method delegates to the same HTTP surface.
A complete example
from curiosity import Graph, Node, Query
with Graph.connect(endpoint="https://workspace.example.com",
token=os.environ["CURIOSITY_TOKEN"],
connector_name="jira-sync") as graph:
graph.set_auto_commit_cost(every_nodes=5000, every_edges=10000)
graph.create_edge_schema(["AssignedTo", "ForDevice"])
for ticket in fetch_jira_tickets():
case = graph.try_add_by_key(
node_type="SupportCase",
key=ticket["id"],
content={
"Summary": ticket["summary"],
"Status": ticket["status"],
"OpenedAt": ticket["created_at"],
},
)
if ticket.get("device_id"):
device = graph.try_add_by_key("Device", ticket["device_id"], {})
graph.link(case, device, "ForDevice")
graph.commit_pending()
Queries
The Python query builder mirrors the C# version with snake-case names.
from curiosity import Query
q = (Query()
.start_at("Manufacturer")
.is_related_to_type("Device")
.out("SupportCase")
.sort_by_timestamp(oldest_first=False)
.take(20)
.emit("N"))
results = graph.query(q)
for node in results.nodes("N"):
print(node)
Method mapping
| Python | C# |
|---|---|
start_at(n) |
StartAt(...) |
out(node_type=None, edge_type=None) |
Out(...) |
out_many(levels, node_types, edge_types, distinct=True) |
OutMany(...) |
similar(index, count, tolerance) |
StartAtSimilarText(...) (different start) |
of_type(t) / of_types(ts) |
type filter (post-traversal) |
except_type(t) / except_types(ts) |
type exclusion |
is_related_to(n, assume_bidirectional_edges=True) |
IsRelatedTo(...) |
is_related_to_type(n) |
filter by neighbor type |
is_not_related_to(n, ...) / is_not_related_to_type(n) |
inverse of above |
sort_by_timestamp(oldest_first=True) |
SortByTimestamp(...) |
sort_by_connectivity(most_connected_first=True) |
SortByConnectivity(...) |
skip(count) / take(count) |
Skip(...) / Take(...) |
emit(emit_key, fields=None) |
Emit("N") (alias becomes emit_key) |
emit_count(emit_key) |
aggregation |
emit_with_edges(emit_key, fields=None) |
EmitWithEdges() |
Some C# methods aren't exposed yet — StartAtSimilarText with full options, EmitSummary, EmitNeighborsSummary, FromSearch, AsUIDEnumerable. For those, call the REST search endpoint directly or use a C# connector.
Async / multi-threading
The Python client is synchronous. If you need concurrency, run multiple connector processes (one per partition) rather than threading inside a single one — the write queue is not thread-safe.
Patterns and tips
- Always use
with. A leakedGraphinstance leaves uncommitted writes. - Auto-commit by cost. For multi-million-node jobs, set
every_nodes=10_000so the queue doesn't grow unbounded. - Owners on file uploads. Pass
owner_user_or_groupdirectly when uploading files; the C# API has more overloads but the single-owner path is the common case. - Re-runnable connectors. Always key writes on stable identifiers (
try_add_by_key/add_or_update_by_key) so re-runs are idempotent.
Related pages
- C# SDK — the more complete API.
- Schema reference
- Graph query language
- Data integration → Connectors