Curiosity

Parquet Connector

Stream Apache Parquet files into the graph row-by-row using Parquet.Net. Parquet is columnar and compressed — a single file is often hundreds of millions of rows. Parquet.Net reads one row group at a time, so memory stays bounded regardless of file size.

Packages

Curiosity.Library on NuGet Parquet.Net on NuGet

dotnet add package Curiosity.Library
dotnet add package Parquet.Net

Expected source shape

A Parquet file whose schema includes the columns you want to ingest. For this example, transactions.parquet is assumed to have:

Column Parquet type C# type
tx_id BYTE_ARRAY (UTF-8) string
customer BYTE_ARRAY (UTF-8) string
amount_cents INT64 long
occurred_at INT64 (TIMESTAMP_MILLIS) DateTimeOffset

Connector code

Program.cs
using Curiosity.Library;
using Parquet;
using Parquet.Serialization;

[Node]
public class Transaction
{
    [Key]       public string         TxId        { get; set; }
    [Property]  public string         Customer    { get; set; }
    [Property]  public long           AmountCents { get; set; }
    [Timestamp] public DateTimeOffset OccurredAt  { get; set; }
}

// DTO with property names that match the Parquet column names.
class TransactionRow
{
    public string         tx_id        { get; set; }
    public string         customer     { get; set; }
    public long           amount_cents { get; set; }
    public DateTime       occurred_at  { get; set; }
}

using var graph = Graph.Connect(
    endpoint:      Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
    token:         Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
    connectorName: "parquet-transactions");

await graph.CreateNodeSchemaAsync<Transaction>();
graph.SetAutoCommitCost(everyNodes: 10_000);

var path = args.Length > 0 ? args[0] : "transactions.parquet";
using var stream = File.OpenRead(path);

var ingested = 0;

// ParquetSerializer.DeserializeAllAsync streams the file row-group by row-group.
await foreach (var row in ParquetSerializer.DeserializeAllAsync<TransactionRow>(stream))
{
    graph.AddOrUpdate(new Transaction
    {
        TxId        = row.tx_id,
        Customer    = row.customer,
        AmountCents = row.amount_cents,
        OccurredAt  = new DateTimeOffset(DateTime.SpecifyKind(row.occurred_at, DateTimeKind.Utc)),
    });
    ingested++;
}

await graph.CommitPendingAsync();
Console.WriteLine($"Ingested {ingested} transactions from {path}");

How it works

ParquetSerializer.DeserializeAllAsync<T>(stream) opens the file, reads one row group at a time, materializes each row into the DTO, and yields it. Only one row group worth of bytes is in memory at any moment — typically 64–512 MB depending on writer settings.

The DTO uses lowercase/snake_case property names to match Parquet conventions. The [Node] POCO keeps idiomatic C# names. If your Parquet writer produces PascalCase columns, you can drop the DTO and deserialize directly into the [Node] type.

Reading specific columns only

For wide schemas you usually want only a subset of columns. Parquet's columnar layout means unused columns are never read from disk — but you still need the DTO to mention only the columns you want:

class TransactionSlim
{
    public string tx_id    { get; set; }
    public long   amount_cents { get; set; }
}

await foreach (var row in ParquetSerializer.DeserializeAllAsync<TransactionSlim>(stream))
{
    // only tx_id and amount_cents are read from disk
}

This is what makes Parquet faster than CSV/JSON on wide rows: the file is laid out in columns, so unread columns cost nothing.

Notes & pitfalls

  • Timestamp types. Parquet has multiple time encodings (TIMESTAMP_MILLIS, TIMESTAMP_MICROS, INT96). Parquet.Net surfaces them as DateTime (UTC). Wrap in DateTimeOffset for storage — the workspace's [Timestamp] expects DateTimeOffset.
  • Nullable columns. A nullable Parquet column maps to T? in C# (long? amount_cents). Don't make it non-nullable or you'll get NullReferenceException mid-stream.
  • Logical types. Decimals, dates, UUIDs are logical types on top of physical types. Parquet.Net handles DECIMALdecimal correctly; check the type mapping table for the rest.
  • Row group size vs auto-commit. If row groups are large (say 1M rows each), set SetAutoCommitCost(everyNodes: 50_000) or higher to amortize the HTTP round-trip across more rows.
  • Partitioned directories. A Spark/Athena/Hive export usually produces many part-*.parquet files in a directory. Glob them with Directory.EnumerateFiles(path, "*.parquet") and run the same loop per file.

Notes on Parquet.Net versions

Parquet.Net 4.x introduced the ParquetSerializer shown above. On 3.x the API is lower-level (ParquetReader + Column arrays). If you're stuck on 3.x, upgrade — the 4.x serializer is much cleaner.

See also

© 2026 Curiosity. All rights reserved.