Python Embedded
Run Traverse as an in-process graph database in Python applications.
Install
pip install traverse
Note: The traverse package includes native bindings via PyO3. Wheels are available for Windows, Linux, and macOS.
Open a Database
import traverse
# Open or create a database (file is created on first flush)
db = traverse.open("mydb.tvdb")
# Open with a license key (validates via Traverse API)
db = traverse.open_licensed("mydb.tvdb", "tskey_...")
The Database object supports context managers for automatic cleanup:
with traverse.open("mydb.tvdb") as db:
# use db
pass # auto-closed on exit
Run Queries
with db.begin() as tx:
result = tx.execute("MATCH (n:Person) RETURN n.name, n.age")
for row in result:
print(row["n.name"], row["n.age"])
Query Parameters
with db.begin() as tx:
result = tx.execute(
"MATCH (n:Person) WHERE n.age > $min_age RETURN n.name",
{"min_age": 25}
)
for row in result:
print(row["n.name"])
Write Data
with db.begin() as tx:
tx.execute("CREATE (p:Person {name: 'Alice', age: 30})")
tx.execute("CREATE (p:Person {name: 'Bob', age: 25})")
tx.commit()
Note: If you use the with statement, the transaction auto-commits on clean exit and auto-aborts on exception. You can also call commit() or abort() explicitly.
Transactions
# Explicit transaction management
tx = db.begin()
try:
tx.execute("CREATE (p:Person {name: 'Carol'})")
tx.execute(
"MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Carol'}) "
"CREATE (a)-[:KNOWS]->(b)"
)
tx.commit()
except Exception:
tx.abort()
raise
QueryResult
with db.begin() as tx:
result = tx.execute("MATCH (n:Person) RETURN n.name, n.age")
print(result.columns) # ['n.name', 'n.age']
print(result.row_count) # number of rows
# Iterate rows (each row is a dict)
for row in result:
print(row["n.name"])
# Get all rows as a list of dicts
rows = result.data()
# Mutation statistics
result = tx.execute("CREATE (n:Test)")
print(result.stats) # {'nodes_created': 1, 'labels_added': 1}
Bulk Writer
The BulkWriter bypasses MVCC for high-throughput imports (2–5M rows/s).
writer = db.bulk_writer()
nid1 = writer.create_node(["Person"], {"name": "Alice", "age": 30})
nid2 = writer.create_node(["Person"], {"name": "Bob", "age": 25})
writer.create_edge(nid1, nid2, "KNOWS", {"since": 2024})
writer.commit()
Or use as a context manager:
with db.bulk_writer() as writer:
nid1 = writer.create_node(["Person"], {"name": "Alice", "age": 30})
nid2 = writer.create_node(["Person"], {"name": "Bob", "age": 25})
writer.create_edge(nid1, nid2, "KNOWS", {"since": 2024})
# auto-commits on clean exit, auto-aborts on exception
Indexes
Create indexes via the bulk writer or Cypher:
# Via BulkWriter
with db.bulk_writer() as writer:
writer.create_index("idx_person_name", "Person", "name")
writer.create_edge_index("idx_knows_since", "KNOWS", "since")
writer.create_edge_index("idx_knows", "KNOWS") # type-only index
# Via Cypher
with db.begin() as tx:
tx.execute("CREATE INDEX idx_person_age FOR (n:Person) ON (n.age)")
tx.commit()
Flush and Close
# Flush dirty pages to disk
db.flush()
# Close the database
db.close()
Note: Traverse is an in-memory database. Call flush() to persist data to the .tvdb file. Data written since the last flush is lost if the process crashes.