Skip to main content

Storage Adapters

Chaos Cypher uses the Hexagonal Architecture (Ports and Adapters) pattern for data persistence. Services depend on protocol interfaces (ports), and concrete implementations (adapters) are injected at runtime. This design enables storage-agnostic business logic and straightforward testing.

Architecture

Storage Protocols

All storage protocols are defined in chaoscypher_core.ports and share a key rule:

Protocols Return Dicts, Always

Every storage protocol method returns dict[str, Any] or list[dict[str, Any]]. Code consuming storage data must use dict access (data["key"] or data.get("key")), never attribute access (data.field).

Available Protocols

Each protocol lives in its own file under chaoscypher_core.ports. Storage protocols follow a per-feature naming convention (storage_<feature>.py).

Per-feature storage protocols:

ProtocolModulePurpose
WorkflowStorageProtocolports.storage_workflowsWorkflow definitions, steps, statistics
WorkflowExecutionStorageProtocolports.storage_workflow_executionsWorkflow execution tracking
ToolStorageProtocolports.storage_toolsSystem and user tool registry
SourceStorageProtocolports.storage_sourcesUnified source lifecycle (upload through commit)
ChatStorageProtocolports.storage_chatsChat history and messages
TriggerStorageProtocolports.storage_triggersEvent triggers and execution history
LLMMetricsStorageProtocolports.storage_llm_metricsLLM call metrics and cost tracking
ChunkStorageProtocolports.storage_chunksDocument chunk persistence
CitationStorageProtocolports.storage_citationsCitation tracking
EntityEmbeddingStorageProtocolports.storage_embeddingsEntity embedding vectors
SourceTagStorageProtocolports.storage_source_tagsSource tag management
ExtractionQueueStorageProtocolports.storage_extraction_queueExtraction job queue state
ExtractionSubmissionStorageProtocolports.storage_extraction_submissionsExtraction submission tracking
GraphSnapshotStorageProtocolports.storage_graph_snapshotGraph snapshot staleness and breakdown queries

Domain-specific protocols:

ProtocolModulePurpose
ChunkingProtocolports.chunkHierarchical document chunk storage
IndexingProtocolports.indexChunk embedding storage and retrieval
GraphRepositoryProtocolports.graphNode, edge, and template CRUD
SearchRepositoryProtocolports.searchKeyword, vector, and hybrid search
SearchRetryQueueProtocolports.search_retryRetry queue for failed search index operations
StructuredExtractorPortports.structured_extractionLLM-backed structured extraction
EmbeddingProviderProtocolports.embeddingEmbedding model provider
TransactionalAdapterProtocolports.transactionalUnit-of-work transaction support

SqliteAdapter (Default)

SqliteAdapter is the built-in storage adapter. It implements all core storage protocols using SQLite and SQLModel.

sqlite-vec extension

SqliteAdapter loads the sqlite-vec extension on every SQLite connection to support vector search operations. Custom adapter authors who need vector search should be aware of this dependency — if you are writing your own adapter, you will need to provide an equivalent vector storage mechanism for your backend.

Engine shortcut

For most use cases, Engine handles adapter creation and wiring automatically. Use SqliteAdapter directly only when you need low-level storage access or are building a custom integration.

from chaoscypher_core import Engine

with Engine("./data/databases/default") as engine:
engine.storage_adapter # Pre-wired SqliteAdapter instance

Setup

For advanced or custom usage, you can create the adapter directly:

from chaoscypher_core import SqliteAdapter

# Option 1: Directory path (database name inferred from directory name)
adapter = SqliteAdapter("data/databases/default")
adapter.connect()

# Option 2: Explicit file path
adapter = SqliteAdapter(db_path="data/databases/default/app.db")
adapter.connect()

# ... use adapter ...

adapter.disconnect()

# Option 3: Context manager (recommended)
with SqliteAdapter("data/databases/default") as adapter:
workflows = adapter.list_workflows("default")

Constructor

SqliteAdapter(
db_path: str, # Path to SQLite database file, or a directory path
database_name: str | None = None, # Derived from parent directory if omitted
)

db_path accepts either a full path to the SQLite file (e.g., data/databases/mydb/app.db) or a directory path (e.g., data/databases/mydb). When a directory is provided, the adapter appends app.db automatically. If database_name is not provided, it is inferred from the directory name. For example, both data/databases/mydb/app.db and data/databases/mydb yield database_name="mydb".

Using with Services

Services accept protocol-typed parameters, so SqliteAdapter can be passed directly where it satisfies the protocol:

from chaoscypher_core import ChunkingService, EngineSettings
from chaoscypher_core import SqliteAdapter

settings = EngineSettings(current_database="default")

with SqliteAdapter("data/databases/default") as adapter:
# SqliteAdapter implements ChunkingProtocol (via SourceChunksMixin)
chunking = ChunkingService(settings=settings, repository=adapter)
result = await chunking.create_chunks("Document text...")
print(f"{result.total_small_chunks} chunks in {result.total_groups} groups")
# Persist chunks to storage (separate step)
chunking.store_chunks(result)

Mixin Architecture

SqliteAdapter composes functionality from focused mixins, each implementing one or more storage protocols:

MixinProtocols Covered
WorkflowsMixinWorkflow CRUD
WorkflowExecutionsMixinExecution tracking
ToolsMixinTool registry
SourceLifecycleMixinSource file upload and lifecycle
SourceIndexingMixinEmbedding storage, extraction gating
SourceExtractionJobsMixinExtraction job management
SourceChunkTasksMixinChunk task analytics
SourcesMixinCore source CRUD
SourceTagsMixinTag management
SourceChunksMixinDocument chunk operations
SourceCitationsMixinCitation tracking
ChatsMixinChat history
TriggersMixinEvent triggers
LLMMetricsMixinLLM call metrics

Mixins live in chaoscypher_core.adapters.sqlite.mixins/ and inherit from SqliteMixinBase which provides shared utilities like entity-to-dict conversion.

Transaction Support

Use the adapter.transaction() context manager to wrap multiple writes atomically. It is nestable — an inner transaction() participates in the enclosing one rather than opening a new SQLite transaction.

with SqliteAdapter(db_path="data/app.db") as adapter:
with adapter.transaction():
adapter.create_workflow({"id": "wf_1", "name": "Test", ...})
adapter.create_workflow_step({"id": "step_1", ...})
# Both writes committed atomically when the context exits normally.
# Any exception rolls back both.

Internally, mixin methods call self._maybe_commit() instead of session.commit() directly. Outside a transaction() block this flushes and commits immediately; inside it only flushes, deferring the commit to the outermost block. This is the Unit of Work pattern — you never need to manage transactions manually in repository code.

Creating a Custom Adapter

To implement a custom storage backend (PostgreSQL, MongoDB, etc.), implement the protocol interfaces that your services need.

Step 1: Choose Protocols

Determine which protocols your adapter needs. You do not need to implement all of them -- only the ones required by the services you use.

from chaoscypher_core import ChunkingProtocol, IndexingProtocol

Step 2: Implement the Protocol

from typing import Any
from chaoscypher_core import ChunkingProtocol


class PostgresChunkingAdapter:
"""PostgreSQL implementation of ChunkingProtocol."""

def __init__(self, connection_string: str):
self.connection_string = connection_string

def store_chunks_and_groups(
self,
small_chunks: list[dict[str, Any]],
hierarchical_groups: list[dict[str, Any]],
batch_size: int = 500,
) -> None:
# Insert chunks and group metadata into PostgreSQL
...

def get_small_chunks(self, source_id: str) -> list[dict[str, Any]]:
# Query chunks from PostgreSQL, return as dicts
...

def get_hierarchical_groups(self, source_id: str) -> list[dict[str, Any]]:
# Query hierarchical groups from PostgreSQL
...

def update_chunk_status(self, source_id: str, status: str) -> int:
# Update chunk status, return count
...

Step 3: Use with Services

Because Python protocols use structural typing, your adapter works with any service that expects ChunkingProtocol without explicit inheritance:

from chaoscypher_core import ChunkingService

pg_adapter = PostgresChunkingAdapter("postgresql://localhost/chaoscypher")
service = ChunkingService(settings=settings, repository=pg_adapter)

Step 4: Combine Multiple Protocols

For a full adapter, use the mixin pattern (like SqliteAdapter) or implement all methods on a single class:

class PostgresAdapter:
"""Full PostgreSQL adapter implementing multiple protocols."""

def __init__(self, connection_string: str):
self.conn = connect(connection_string)

# ChunkingProtocol methods
def store_chunks_and_groups(self, ...): ...
def get_small_chunks(self, ...): ...
def get_hierarchical_groups(self, ...): ...
def update_chunk_status(self, ...): ...

# IndexingProtocol methods
def get_chunks_by_source(self, ...): ...
def update_chunk_embedding(self, ...): ...
def get_chunk_by_id(self, ...): ...
def delete_chunks_by_source(self, ...): ...
def update_chunk_source(self, ...): ...

Testing with Mock Adapters

For testing, create lightweight mock adapters using in-memory data:

from typing import Any


class MockChunkingAdapter:
"""In-memory mock for testing ChunkingService."""

def __init__(self):
self.chunks: dict[str, list[dict[str, Any]]] = {}
self.groups: dict[str, list[dict[str, Any]]] = {}

def store_chunks_and_groups(
self,
small_chunks: list[dict[str, Any]],
hierarchical_groups: list[dict[str, Any]],
batch_size: int = 500,
) -> None:
if small_chunks:
source_id = small_chunks[0]["source_id"]
self.chunks[source_id] = small_chunks
self.groups[source_id] = hierarchical_groups

def get_small_chunks(self, source_id: str) -> list[dict[str, Any]]:
return self.chunks.get(source_id, [])

def get_hierarchical_groups(self, source_id: str) -> list[dict[str, Any]]:
return self.groups.get(source_id, [])

def update_chunk_status(self, source_id: str, status: str) -> int:
chunks = self.chunks.get(source_id, [])
for chunk in chunks:
chunk["status"] = status
return len(chunks)

Use it in tests:

from chaoscypher_core import ChunkingService, EngineSettings

mock_repo = MockChunkingAdapter()
settings = EngineSettings(current_database="test")
service = ChunkingService(settings=settings, repository=mock_repo)

result = await service.create_chunks("Test document content for chunking.")

assert result.total_small_chunks > 0

# Persist if needed
service.store_chunks(result)
assert len(mock_repo.chunks["test_src"]) == result.total_small_chunks

Key Design Decisions

Why dicts instead of entities? Storage protocols return dict[str, Any] rather than ORM entities to maintain framework independence. The chaoscypher_core package has no dependency on SQLModel or any web framework. Dicts are universally portable and trivially mockable.

Why structural typing (Protocols)? Python's Protocol class enables duck typing with static type checking. Adapters do not need to inherit from a base class -- they just need to have the right method signatures. This makes it easy to create partial implementations for testing.

Why mixins? The SqliteAdapter uses 14 mixins because the full protocol surface is large. Each mixin is focused on one domain (sources, chunks, workflows, etc.), following the Interface Segregation Principle. This keeps individual files maintainable while composing into a comprehensive adapter.