Conversation Provider Protocol¶
Protocol definition for conversation export adapters.
Overview¶
ConversationProvider is a Protocol (PEP 544) that defines the interface all conversation adapters must implement. This enables type-safe, provider-agnostic code.
API Reference¶
ConversationProvider
¶
Bases: Protocol[ConversationT]
Generic protocol for AI provider export parsers (per FR-151).
All adapters (OpenAI, Anthropic, Google, etc.) MUST implement this protocol. This ensures a unified interface regardless of export format, enabling library consumers to write provider-agnostic code.
Type Parameter
ConversationT: Provider-specific conversation type (must implement BaseConversation) Examples: Conversation (OpenAI), ClaudeConversation (Anthropic)
Adapter Design Principles (per FR-113, FR-114, FR-115, FR-120): - Stateless: No configuration parameters in init - Reusable: Same adapter instance can process different files - Lightweight: Instantiation should be instant (no I/O) - NOT context managers: Adapters don't implement enter/exit
Thread Safety (per FR-098, FR-099, FR-100, FR-101): - Adapter instances MUST be thread-safe (safe to share across threads) - Iterators returned by methods MUST NOT be shared across threads - Each thread MUST create its own iterator by calling methods separately
Iterator Lifecycle (per FR-116, FR-117, FR-118, FR-119): - Iterators are single-use (exhausted after completion) - Multiple calls return independent iterators (not resume) - File handles closed even if iteration stops early - Context managers guarantee cleanup in ALL scenarios
Resource Management (per FR-130, FR-131, FR-132, FR-133): - Methods use try/finally for cleanup guarantees - File handles managed via context managers - Cleanup occurs: normal completion, early break, exceptions, GC - NO del methods for cleanup
Backpressure (per FR-134, FR-135, FR-136, FR-137): - NO explicit backpressure mechanisms - Generators yield one item at a time - Memory usage constant regardless of consumer speed - Consumer controls parsing pace (pull-based)
Example
from pathlib import Path
from echomine import OpenAIAdapter, SearchQuery
adapter = OpenAIAdapter()
# List all conversations
for conversation in adapter.stream_conversations(Path("export.json")):
print(conversation.title)
# Search with filters
query = SearchQuery(keywords=["algorithm"], limit=10)
for result in adapter.search(Path("export.json"), query):
print(f"{result.score:.2f}: {result.conversation.title}")
# Get specific conversation
conv = adapter.get_conversation_by_id(Path("export.json"), "conv-uuid-123")
if conv:
print(conv.title)
Requirements
- FR-151: Generic protocol with ConversationT type parameter
- FR-215-221: Complete method signatures with proper types
- FR-027: All adapters must implement this protocol
stream_conversations
¶
stream_conversations(
file_path: Path,
*,
progress_callback: ProgressCallback | None = None,
on_skip: OnSkipCallback | None = None,
) -> Iterator[ConversationT]
Stream conversations one at a time from export file (per FR-151, FR-153).
Memory Contract: MUST use streaming (ijson or equivalent) to avoid loading entire file into memory. Memory usage MUST be constant regardless of file size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Path
|
Absolute path to export file (e.g., /path/to/conversations.json) |
required |
progress_callback
|
ProgressCallback | None
|
Optional callback(count) called periodically with item count |
None
|
on_skip
|
OnSkipCallback | None
|
Optional callback(conversation_id, reason) when malformed entries skipped |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
ConversationT |
ConversationT
|
Provider-specific conversation objects one at a time |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file_path does not exist (per FR-049, FR-033) |
PermissionError
|
If file_path is not readable (per FR-051, FR-033) |
ParseError
|
If export format is invalid or corrupted (per FR-036) |
SchemaVersionError
|
If export schema version is unsupported (per FR-036, FR-085) |
ValidationError
|
If conversation data fails Pydantic validation (per FR-036, FR-054) |
Exception Handling (per FR-042, FR-045, FR-046, FR-047, FR-048): - MUST fail fast, no retries - MUST use context managers for file handle cleanup - MUST include conversation index in exception messages - MUST NOT raise StopIteration explicitly (use return)
Progress Reporting (per FR-068, FR-069): - progress_callback called every 100 items OR 100ms, whichever comes first - Callback receives current item count (not percentage)
Graceful Degradation (per FR-004, FR-105): - Malformed entries skipped with WARNING log - on_skip callback invoked with conversation_id and reason - Processing continues after skip
Thread Safety
This iterator MUST NOT be shared across threads (per FR-099). Each thread must call this method to get its own iterator.
Requirements
- FR-151: Generic return type (Iterator[ConversationT])
- FR-153: Streaming memory contract
- FR-076, FR-077: Progress callback support
- FR-106, FR-107: Skip callback support
Source code in src/echomine/models/protocols.py
search
¶
search(
file_path: Path,
query: SearchQuery,
*,
progress_callback: ProgressCallback | None = None,
on_skip: OnSkipCallback | None = None,
) -> Iterator[SearchResult[ConversationT]]
Search conversations matching query criteria with relevance ranking.
Ranking Contract: Results MUST be sorted by relevance_score (descending). BM25 or equivalent algorithm MUST be used for keyword ranking (FR-317).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Path
|
Path to export file |
required |
query
|
SearchQuery
|
Search parameters (keywords, title filter, date range, limit) |
required |
progress_callback
|
ProgressCallback | None
|
Optional callback(count) for progress reporting |
None
|
on_skip
|
OnSkipCallback | None
|
Optional callback(conversation_id, reason) when entries skipped |
None
|
Yields:
| Type | Description |
|---|---|
SearchResult[ConversationT]
|
SearchResult[ConversationT]: Matched conversations with provider-specific type, sorted by relevance (highest first) |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file_path does not exist (per FR-049) |
PermissionError
|
If file_path is not readable (per FR-051) |
ParseError
|
If export format is invalid (per FR-036) |
SchemaVersionError
|
If schema version unsupported (per FR-036, FR-085) |
ValidationError
|
If query or conversation data invalid (per FR-036, FR-054, FR-055) |
Thread Safety
This iterator MUST NOT be shared across threads (per FR-099).
Requirements
- FR-152: Generic return type (Iterator[SearchResult[ConversationT]])
- FR-153: Memory-efficient streaming
- FR-317-326: BM25 relevance ranking
Source code in src/echomine/models/protocols.py
get_conversation_by_id
¶
Retrieve specific conversation by UUID (per FR-151, FR-153, FR-155).
Performance Contract: MAY use streaming or index lookup. For large files, consider building an in-memory index (conversation_id -> file offset) on first call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Path
|
Path to export file |
required |
conversation_id
|
str
|
Conversation UUID from export |
required |
Returns:
| Name | Type | Description |
|---|---|---|
ConversationT |
ConversationT | None
|
Provider-specific conversation object if found |
None |
ConversationT | None
|
If conversation_id not found in export (per FR-155) |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file_path does not exist (per FR-049) |
PermissionError
|
If file_path is not readable (per FR-051) |
ParseError
|
If export format is invalid (per FR-036) |
SchemaVersionError
|
If schema version unsupported (per FR-036, FR-085) |
Requirements
- FR-151: Generic return type (Optional[ConversationT])
- FR-153: Memory-efficient (O(1) or streaming until found)
- FR-155: Returns None if not found (not exception)
Source code in src/echomine/models/protocols.py
Usage Examples¶
Type-Safe Adapter Usage¶
Write code that works with any adapter:
from echomine.protocols import ConversationProvider
from echomine.models import Conversation
from pathlib import Path
def process_export(
adapter: ConversationProvider, # Works with ANY adapter
file_path: Path
) -> int:
"""Process export file using any provider adapter."""
count = 0
for conv in adapter.stream_conversations(file_path):
print(f"{conv.title}: {len(conv.messages)} messages")
count += 1
return count
# Works with OpenAI
from echomine import OpenAIAdapter
count = process_export(OpenAIAdapter(), Path("chatgpt.json"))
# Works with future providers
# from echomine import ClaudeAdapter
# count = process_export(ClaudeAdapter(), Path("claude.jsonl"))
Adapter Registry Pattern¶
Build multi-provider systems:
from echomine import OpenAIAdapter
from echomine.protocols import ConversationProvider
# Adapter registry
ADAPTERS: dict[str, type[ConversationProvider]] = {
"openai": OpenAIAdapter,
# Future providers:
# "anthropic": ClaudeAdapter,
# "google": GeminiAdapter,
}
def ingest_ai_export(provider: str, export_file: Path):
"""Ingest any AI provider export."""
adapter_class = ADAPTERS.get(provider)
if not adapter_class:
raise ValueError(f"Unknown provider: {provider}")
adapter = adapter_class()
# Same logic works for all providers!
count = 0
for conv in adapter.stream_conversations(export_file):
knowledge_base.add(conv)
count += 1
return count
# Usage
ingest_ai_export("openai", Path("chatgpt_export.json"))
# Future: ingest_ai_export("anthropic", Path("claude_export.jsonl"))
Protocol Methods¶
stream_conversations()¶
Stream all conversations from export file.
Signature:
def stream_conversations(
self,
file_path: Path,
*,
progress_callback: Optional[Callable[[int], None]] = None,
on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[ConversationT]:
...
Required for all adapters.
search()¶
Search conversations with filtering and ranking.
Signature:
def search(
self,
file_path: Path,
query: SearchQuery,
*,
progress_callback: Optional[Callable[[int], None]] = None,
on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[SearchResult[ConversationT]]:
...
Required for all adapters.
get_conversation_by_id()¶
Retrieve specific conversation by ID.
Signature:
def get_conversation_by_id(
self,
file_path: Path,
conversation_id: str,
) -> Optional[ConversationT]:
...
Required for all adapters.
Implementing Custom Adapters¶
Step 1: Define Adapter Class¶
from typing import Iterator, Optional
from pathlib import Path
from echomine.models import Conversation, SearchQuery, SearchResult
class ClaudeAdapter:
"""Adapter for Anthropic Claude exports."""
def stream_conversations(
self,
file_path: Path,
*,
progress_callback: Optional[Callable[[int], None]] = None,
on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[Conversation]:
"""Stream conversations from Claude export."""
# Implementation here
pass
def search(
self,
file_path: Path,
query: SearchQuery,
*,
progress_callback: Optional[Callable[[int], None]] = None,
on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[SearchResult[Conversation]]:
"""Search Claude conversations."""
# Implementation here
pass
def get_conversation_by_id(
self,
file_path: Path,
conversation_id: str,
) -> Optional[Conversation]:
"""Get Claude conversation by ID."""
# Implementation here
pass
Step 2: Normalize Provider-Specific Data¶
Map provider-specific fields to standard Conversation model:
# Claude-specific parsing
def _parse_claude_conversation(self, raw_data: dict) -> Conversation:
"""Parse Claude export format."""
return Conversation(
id=raw_data["conversation_id"],
title=raw_data["name"],
created_at=self._parse_timestamp(raw_data["created_at"]),
messages=self._parse_messages(raw_data["messages"]),
metadata={
"claude_model": raw_data.get("model", "unknown"),
"claude_workspace_id": raw_data.get("workspace_id"),
}
)
Step 3: Normalize Roles¶
Map provider-specific roles to standard roles:
def _normalize_role(self, claude_role: str) -> Literal["user", "assistant", "system"]:
"""Normalize Claude roles to standard roles."""
role_mapping = {
"human": "user",
"assistant": "assistant",
# Claude doesn't have system role
}
return role_mapping.get(claude_role, "user")
Step 4: Implement Streaming¶
Use generators for memory efficiency:
import ijson
def stream_conversations(
self,
file_path: Path,
*,
progress_callback: Optional[Callable[[int], None]] = None,
on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[Conversation]:
"""Stream conversations with O(1) memory."""
with open(file_path, "rb") as f:
parser = ijson.items(f, "item") # Streaming parser
count = 0
for item in parser:
try:
conversation = self._parse_claude_conversation(item)
yield conversation
count += 1
# Progress reporting
if progress_callback and count % 100 == 0:
progress_callback(count)
except ValidationError as e:
# Graceful degradation
if on_skip:
on_skip(item.get("conversation_id", "unknown"), str(e))
continue
Step 5: Type Checking¶
Verify protocol compliance with mypy:
from echomine.protocols import ConversationProvider
# This line verifies ClaudeAdapter implements the protocol
adapter: ConversationProvider = ClaudeAdapter() # Type-checks!
Design Guidelines¶
Stateless Design¶
Adapters should have no __init__ parameters:
# ✅ CORRECT: Stateless
class ClaudeAdapter:
def stream_conversations(self, file_path: Path) -> Iterator[Conversation]:
pass
# ❌ WRONG: Stateful
class ClaudeAdapter:
def __init__(self, file_path: Path): # NO!
self.file_path = file_path
Memory Efficiency¶
Always use streaming (generators, not lists):
# ✅ CORRECT: Generator
def stream_conversations(self, file_path: Path) -> Iterator[Conversation]:
for item in parser:
yield conversation
# ❌ WRONG: List (loads entire file)
def stream_conversations(self, file_path: Path) -> list[Conversation]:
return [conversation for item in parser]
Error Handling¶
- Fail fast on unrecoverable errors (file not found, unsupported version)
- Graceful degradation on data errors (skip malformed entries)
Progress Reporting¶
Invoke callbacks every 100 items OR 100ms (whichever comes first):
import time
last_progress_time = time.monotonic()
count = 0
for item in parser:
count += 1
current_time = time.monotonic()
if progress_callback and (count % 100 == 0 or current_time - last_progress_time >= 0.1):
progress_callback(count)
last_progress_time = current_time
Protocol Benefits¶
- Type Safety: mypy validates adapter implementations
- Interchangeability: Swap providers without code changes
- Testability: Mock adapters for testing
- Documentation: Self-documenting interface
Related¶
- OpenAI Adapter: Reference implementation
- Conversation Model: Standard conversation model
- SearchQuery: Search parameters