Python Embedded

Run Traverse as an in-process graph database in Python applications.

Install

pip install traverse

Note: The traverse package includes native bindings via PyO3. Wheels are available for Windows, Linux, and macOS.

Open a Database

import traverse

# Open or create a database (file is created on first flush)
db = traverse.open("mydb.tvdb")

# Open with a license key (validates via Traverse API)
db = traverse.open_licensed("mydb.tvdb", "tskey_...")

The Database object supports context managers for automatic cleanup:

with traverse.open("mydb.tvdb") as db:
    # use db
    pass  # auto-closed on exit

Run Queries

with db.begin() as tx:
    result = tx.execute("MATCH (n:Person) RETURN n.name, n.age")
    for row in result:
        print(row["n.name"], row["n.age"])

Query Parameters

with db.begin() as tx:
    result = tx.execute(
        "MATCH (n:Person) WHERE n.age > $min_age RETURN n.name",
        {"min_age": 25}
    )
    for row in result:
        print(row["n.name"])

Write Data

with db.begin() as tx:
    tx.execute("CREATE (p:Person {name: 'Alice', age: 30})")
    tx.execute("CREATE (p:Person {name: 'Bob', age: 25})")
    tx.commit()

Note: If you use the with statement, the transaction auto-commits on clean exit and auto-aborts on exception. You can also call commit() or abort() explicitly.

Transactions

# Explicit transaction management
tx = db.begin()
try:
    tx.execute("CREATE (p:Person {name: 'Carol'})")
    tx.execute(
        "MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Carol'}) "
        "CREATE (a)-[:KNOWS]->(b)"
    )
    tx.commit()
except Exception:
    tx.abort()
    raise

QueryResult

with db.begin() as tx:
    result = tx.execute("MATCH (n:Person) RETURN n.name, n.age")

    print(result.columns)    # ['n.name', 'n.age']
    print(result.row_count)  # number of rows

    # Iterate rows (each row is a dict)
    for row in result:
        print(row["n.name"])

    # Get all rows as a list of dicts
    rows = result.data()

    # Mutation statistics
    result = tx.execute("CREATE (n:Test)")
    print(result.stats)  # {'nodes_created': 1, 'labels_added': 1}

Bulk Writer

The BulkWriter bypasses MVCC for high-throughput imports (2–5M rows/s).

writer = db.bulk_writer()
nid1 = writer.create_node(["Person"], {"name": "Alice", "age": 30})
nid2 = writer.create_node(["Person"], {"name": "Bob", "age": 25})
writer.create_edge(nid1, nid2, "KNOWS", {"since": 2024})
writer.commit()

Or use as a context manager:

with db.bulk_writer() as writer:
    nid1 = writer.create_node(["Person"], {"name": "Alice", "age": 30})
    nid2 = writer.create_node(["Person"], {"name": "Bob", "age": 25})
    writer.create_edge(nid1, nid2, "KNOWS", {"since": 2024})
    # auto-commits on clean exit, auto-aborts on exception

Indexes

Create indexes via the bulk writer or Cypher:

# Via BulkWriter
with db.bulk_writer() as writer:
    writer.create_index("idx_person_name", "Person", "name")
    writer.create_edge_index("idx_knows_since", "KNOWS", "since")
    writer.create_edge_index("idx_knows", "KNOWS")  # type-only index

# Via Cypher
with db.begin() as tx:
    tx.execute("CREATE INDEX idx_person_age FOR (n:Person) ON (n.age)")
    tx.commit()

Flush and Close

# Flush dirty pages to disk
db.flush()

# Close the database
db.close()

Note: Traverse is an in-memory database. Call flush() to persist data to the .tvdb file. Data written since the last flush is lost if the process crashes.