Skip to content

Conversation Provider Protocol

Protocol definition for conversation export adapters.

Overview

ConversationProvider is a Protocol (PEP 544) that defines the interface all conversation adapters must implement. This enables type-safe, provider-agnostic code.

API Reference

ConversationProvider

Bases: Protocol[ConversationT]

Generic protocol for AI provider export parsers (per FR-151).

All adapters (OpenAI, Anthropic, Google, etc.) MUST implement this protocol. This ensures a unified interface regardless of export format, enabling library consumers to write provider-agnostic code.

Type Parameter

ConversationT: Provider-specific conversation type (must implement BaseConversation) Examples: Conversation (OpenAI), ClaudeConversation (Anthropic)

Adapter Design Principles (per FR-113, FR-114, FR-115, FR-120): - Stateless: No configuration parameters in init - Reusable: Same adapter instance can process different files - Lightweight: Instantiation should be instant (no I/O) - NOT context managers: Adapters don't implement enter/exit

Thread Safety (per FR-098, FR-099, FR-100, FR-101): - Adapter instances MUST be thread-safe (safe to share across threads) - Iterators returned by methods MUST NOT be shared across threads - Each thread MUST create its own iterator by calling methods separately

Iterator Lifecycle (per FR-116, FR-117, FR-118, FR-119): - Iterators are single-use (exhausted after completion) - Multiple calls return independent iterators (not resume) - File handles closed even if iteration stops early - Context managers guarantee cleanup in ALL scenarios

Resource Management (per FR-130, FR-131, FR-132, FR-133): - Methods use try/finally for cleanup guarantees - File handles managed via context managers - Cleanup occurs: normal completion, early break, exceptions, GC - NO del methods for cleanup

Backpressure (per FR-134, FR-135, FR-136, FR-137): - NO explicit backpressure mechanisms - Generators yield one item at a time - Memory usage constant regardless of consumer speed - Consumer controls parsing pace (pull-based)

Example
from pathlib import Path
from echomine import OpenAIAdapter, SearchQuery

adapter = OpenAIAdapter()

# List all conversations
for conversation in adapter.stream_conversations(Path("export.json")):
    print(conversation.title)

# Search with filters
query = SearchQuery(keywords=["algorithm"], limit=10)
for result in adapter.search(Path("export.json"), query):
    print(f"{result.score:.2f}: {result.conversation.title}")

# Get specific conversation
conv = adapter.get_conversation_by_id(Path("export.json"), "conv-uuid-123")
if conv:
    print(conv.title)
Requirements
  • FR-151: Generic protocol with ConversationT type parameter
  • FR-215-221: Complete method signatures with proper types
  • FR-027: All adapters must implement this protocol

stream_conversations

stream_conversations(
    file_path: Path,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[ConversationT]

Stream conversations one at a time from export file (per FR-151, FR-153).

Memory Contract: MUST use streaming (ijson or equivalent) to avoid loading entire file into memory. Memory usage MUST be constant regardless of file size.

Parameters:

Name Type Description Default
file_path Path

Absolute path to export file (e.g., /path/to/conversations.json)

required
progress_callback ProgressCallback | None

Optional callback(count) called periodically with item count

None
on_skip OnSkipCallback | None

Optional callback(conversation_id, reason) when malformed entries skipped

None

Yields:

Name Type Description
ConversationT ConversationT

Provider-specific conversation objects one at a time

Raises:

Type Description
FileNotFoundError

If file_path does not exist (per FR-049, FR-033)

PermissionError

If file_path is not readable (per FR-051, FR-033)

ParseError

If export format is invalid or corrupted (per FR-036)

SchemaVersionError

If export schema version is unsupported (per FR-036, FR-085)

ValidationError

If conversation data fails Pydantic validation (per FR-036, FR-054)

Exception Handling (per FR-042, FR-045, FR-046, FR-047, FR-048): - MUST fail fast, no retries - MUST use context managers for file handle cleanup - MUST include conversation index in exception messages - MUST NOT raise StopIteration explicitly (use return)

Progress Reporting (per FR-068, FR-069): - progress_callback called every 100 items OR 100ms, whichever comes first - Callback receives current item count (not percentage)

Graceful Degradation (per FR-004, FR-105): - Malformed entries skipped with WARNING log - on_skip callback invoked with conversation_id and reason - Processing continues after skip

Thread Safety

This iterator MUST NOT be shared across threads (per FR-099). Each thread must call this method to get its own iterator.

Requirements
  • FR-151: Generic return type (Iterator[ConversationT])
  • FR-153: Streaming memory contract
  • FR-076, FR-077: Progress callback support
  • FR-106, FR-107: Skip callback support
Source code in src/echomine/models/protocols.py
def stream_conversations(
    self,
    file_path: Path,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[ConversationT]:
    """Stream conversations one at a time from export file (per FR-151, FR-153).

    Memory Contract: MUST use streaming (ijson or equivalent) to avoid loading
    entire file into memory. Memory usage MUST be constant regardless of file size.

    Args:
        file_path: Absolute path to export file (e.g., /path/to/conversations.json)
        progress_callback: Optional callback(count) called periodically with item count
        on_skip: Optional callback(conversation_id, reason) when malformed entries skipped

    Yields:
        ConversationT: Provider-specific conversation objects one at a time

    Raises:
        FileNotFoundError: If file_path does not exist (per FR-049, FR-033)
        PermissionError: If file_path is not readable (per FR-051, FR-033)
        ParseError: If export format is invalid or corrupted (per FR-036)
        SchemaVersionError: If export schema version is unsupported (per FR-036, FR-085)
        ValidationError: If conversation data fails Pydantic validation (per FR-036, FR-054)

    Exception Handling (per FR-042, FR-045, FR-046, FR-047, FR-048):
        - MUST fail fast, no retries
        - MUST use context managers for file handle cleanup
        - MUST include conversation index in exception messages
        - MUST NOT raise StopIteration explicitly (use return)

    Progress Reporting (per FR-068, FR-069):
        - progress_callback called every 100 items OR 100ms, whichever comes first
        - Callback receives current item count (not percentage)

    Graceful Degradation (per FR-004, FR-105):
        - Malformed entries skipped with WARNING log
        - on_skip callback invoked with conversation_id and reason
        - Processing continues after skip

    Thread Safety:
        This iterator MUST NOT be shared across threads (per FR-099).
        Each thread must call this method to get its own iterator.

    Requirements:
        - FR-151: Generic return type (Iterator[ConversationT])
        - FR-153: Streaming memory contract
        - FR-076, FR-077: Progress callback support
        - FR-106, FR-107: Skip callback support
    """
    ...

search

search(
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[SearchResult[ConversationT]]

Search conversations matching query criteria with relevance ranking.

Ranking Contract: Results MUST be sorted by relevance_score (descending). BM25 or equivalent algorithm MUST be used for keyword ranking (FR-317).

Parameters:

Name Type Description Default
file_path Path

Path to export file

required
query SearchQuery

Search parameters (keywords, title filter, date range, limit)

required
progress_callback ProgressCallback | None

Optional callback(count) for progress reporting

None
on_skip OnSkipCallback | None

Optional callback(conversation_id, reason) when entries skipped

None

Yields:

Type Description
SearchResult[ConversationT]

SearchResult[ConversationT]: Matched conversations with provider-specific type, sorted by relevance (highest first)

Raises:

Type Description
FileNotFoundError

If file_path does not exist (per FR-049)

PermissionError

If file_path is not readable (per FR-051)

ParseError

If export format is invalid (per FR-036)

SchemaVersionError

If schema version unsupported (per FR-036, FR-085)

ValidationError

If query or conversation data invalid (per FR-036, FR-054, FR-055)

Thread Safety

This iterator MUST NOT be shared across threads (per FR-099).

Requirements
  • FR-152: Generic return type (Iterator[SearchResult[ConversationT]])
  • FR-153: Memory-efficient streaming
  • FR-317-326: BM25 relevance ranking
Source code in src/echomine/models/protocols.py
def search(
    self,
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[SearchResult[ConversationT]]:
    """Search conversations matching query criteria with relevance ranking.

    Ranking Contract: Results MUST be sorted by relevance_score (descending).
    BM25 or equivalent algorithm MUST be used for keyword ranking (FR-317).

    Args:
        file_path: Path to export file
        query: Search parameters (keywords, title filter, date range, limit)
        progress_callback: Optional callback(count) for progress reporting
        on_skip: Optional callback(conversation_id, reason) when entries skipped

    Yields:
        SearchResult[ConversationT]: Matched conversations with provider-specific type,
                                     sorted by relevance (highest first)

    Raises:
        FileNotFoundError: If file_path does not exist (per FR-049)
        PermissionError: If file_path is not readable (per FR-051)
        ParseError: If export format is invalid (per FR-036)
        SchemaVersionError: If schema version unsupported (per FR-036, FR-085)
        ValidationError: If query or conversation data invalid (per FR-036, FR-054, FR-055)

    Thread Safety:
        This iterator MUST NOT be shared across threads (per FR-099).

    Requirements:
        - FR-152: Generic return type (Iterator[SearchResult[ConversationT]])
        - FR-153: Memory-efficient streaming
        - FR-317-326: BM25 relevance ranking
    """
    ...

get_conversation_by_id

get_conversation_by_id(
    file_path: Path, conversation_id: str
) -> ConversationT | None

Retrieve specific conversation by UUID (per FR-151, FR-153, FR-155).

Performance Contract: MAY use streaming or index lookup. For large files, consider building an in-memory index (conversation_id -> file offset) on first call.

Parameters:

Name Type Description Default
file_path Path

Path to export file

required
conversation_id str

Conversation UUID from export

required

Returns:

Name Type Description
ConversationT ConversationT | None

Provider-specific conversation object if found

None ConversationT | None

If conversation_id not found in export (per FR-155)

Raises:

Type Description
FileNotFoundError

If file_path does not exist (per FR-049)

PermissionError

If file_path is not readable (per FR-051)

ParseError

If export format is invalid (per FR-036)

SchemaVersionError

If schema version unsupported (per FR-036, FR-085)

Requirements
  • FR-151: Generic return type (Optional[ConversationT])
  • FR-153: Memory-efficient (O(1) or streaming until found)
  • FR-155: Returns None if not found (not exception)
Source code in src/echomine/models/protocols.py
def get_conversation_by_id(
    self,
    file_path: Path,
    conversation_id: str,
) -> ConversationT | None:
    """Retrieve specific conversation by UUID (per FR-151, FR-153, FR-155).

    Performance Contract: MAY use streaming or index lookup. For large files,
    consider building an in-memory index (conversation_id -> file offset) on first call.

    Args:
        file_path: Path to export file
        conversation_id: Conversation UUID from export

    Returns:
        ConversationT: Provider-specific conversation object if found
        None: If conversation_id not found in export (per FR-155)

    Raises:
        FileNotFoundError: If file_path does not exist (per FR-049)
        PermissionError: If file_path is not readable (per FR-051)
        ParseError: If export format is invalid (per FR-036)
        SchemaVersionError: If schema version unsupported (per FR-036, FR-085)

    Requirements:
        - FR-151: Generic return type (Optional[ConversationT])
        - FR-153: Memory-efficient (O(1) or streaming until found)
        - FR-155: Returns None if not found (not exception)
    """
    ...

Usage Examples

Type-Safe Adapter Usage

Write code that works with any adapter:

from echomine.protocols import ConversationProvider
from echomine.models import Conversation
from pathlib import Path

def process_export(
    adapter: ConversationProvider,  # Works with ANY adapter
    file_path: Path
) -> int:
    """Process export file using any provider adapter."""
    count = 0
    for conv in adapter.stream_conversations(file_path):
        print(f"{conv.title}: {len(conv.messages)} messages")
        count += 1
    return count


# Works with OpenAI
from echomine import OpenAIAdapter
count = process_export(OpenAIAdapter(), Path("chatgpt.json"))

# Works with future providers
# from echomine import ClaudeAdapter
# count = process_export(ClaudeAdapter(), Path("claude.jsonl"))

Adapter Registry Pattern

Build multi-provider systems:

from echomine import OpenAIAdapter
from echomine.protocols import ConversationProvider

# Adapter registry
ADAPTERS: dict[str, type[ConversationProvider]] = {
    "openai": OpenAIAdapter,
    # Future providers:
    # "anthropic": ClaudeAdapter,
    # "google": GeminiAdapter,
}

def ingest_ai_export(provider: str, export_file: Path):
    """Ingest any AI provider export."""
    adapter_class = ADAPTERS.get(provider)
    if not adapter_class:
        raise ValueError(f"Unknown provider: {provider}")

    adapter = adapter_class()

    # Same logic works for all providers!
    count = 0
    for conv in adapter.stream_conversations(export_file):
        knowledge_base.add(conv)
        count += 1

    return count


# Usage
ingest_ai_export("openai", Path("chatgpt_export.json"))
# Future: ingest_ai_export("anthropic", Path("claude_export.jsonl"))

Protocol Methods

stream_conversations()

Stream all conversations from export file.

Signature:

def stream_conversations(
    self,
    file_path: Path,
    *,
    progress_callback: Optional[Callable[[int], None]] = None,
    on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[ConversationT]:
    ...

Required for all adapters.

Search conversations with filtering and ranking.

Signature:

def search(
    self,
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: Optional[Callable[[int], None]] = None,
    on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[SearchResult[ConversationT]]:
    ...

Required for all adapters.

get_conversation_by_id()

Retrieve specific conversation by ID.

Signature:

def get_conversation_by_id(
    self,
    file_path: Path,
    conversation_id: str,
) -> Optional[ConversationT]:
    ...

Required for all adapters.

Implementing Custom Adapters

Step 1: Define Adapter Class

from typing import Iterator, Optional
from pathlib import Path
from echomine.models import Conversation, SearchQuery, SearchResult

class ClaudeAdapter:
    """Adapter for Anthropic Claude exports."""

    def stream_conversations(
        self,
        file_path: Path,
        *,
        progress_callback: Optional[Callable[[int], None]] = None,
        on_skip: Optional[Callable[[str, str], None]] = None,
    ) -> Iterator[Conversation]:
        """Stream conversations from Claude export."""
        # Implementation here
        pass

    def search(
        self,
        file_path: Path,
        query: SearchQuery,
        *,
        progress_callback: Optional[Callable[[int], None]] = None,
        on_skip: Optional[Callable[[str, str], None]] = None,
    ) -> Iterator[SearchResult[Conversation]]:
        """Search Claude conversations."""
        # Implementation here
        pass

    def get_conversation_by_id(
        self,
        file_path: Path,
        conversation_id: str,
    ) -> Optional[Conversation]:
        """Get Claude conversation by ID."""
        # Implementation here
        pass

Step 2: Normalize Provider-Specific Data

Map provider-specific fields to standard Conversation model:

# Claude-specific parsing
def _parse_claude_conversation(self, raw_data: dict) -> Conversation:
    """Parse Claude export format."""
    return Conversation(
        id=raw_data["conversation_id"],
        title=raw_data["name"],
        created_at=self._parse_timestamp(raw_data["created_at"]),
        messages=self._parse_messages(raw_data["messages"]),
        metadata={
            "claude_model": raw_data.get("model", "unknown"),
            "claude_workspace_id": raw_data.get("workspace_id"),
        }
    )

Step 3: Normalize Roles

Map provider-specific roles to standard roles:

def _normalize_role(self, claude_role: str) -> Literal["user", "assistant", "system"]:
    """Normalize Claude roles to standard roles."""
    role_mapping = {
        "human": "user",
        "assistant": "assistant",
        # Claude doesn't have system role
    }
    return role_mapping.get(claude_role, "user")

Step 4: Implement Streaming

Use generators for memory efficiency:

import ijson

def stream_conversations(
    self,
    file_path: Path,
    *,
    progress_callback: Optional[Callable[[int], None]] = None,
    on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[Conversation]:
    """Stream conversations with O(1) memory."""
    with open(file_path, "rb") as f:
        parser = ijson.items(f, "item")  # Streaming parser
        count = 0

        for item in parser:
            try:
                conversation = self._parse_claude_conversation(item)
                yield conversation
                count += 1

                # Progress reporting
                if progress_callback and count % 100 == 0:
                    progress_callback(count)

            except ValidationError as e:
                # Graceful degradation
                if on_skip:
                    on_skip(item.get("conversation_id", "unknown"), str(e))
                continue

Step 5: Type Checking

Verify protocol compliance with mypy:

from echomine.protocols import ConversationProvider

# This line verifies ClaudeAdapter implements the protocol
adapter: ConversationProvider = ClaudeAdapter()  # Type-checks!

Design Guidelines

Stateless Design

Adapters should have no __init__ parameters:

# ✅ CORRECT: Stateless
class ClaudeAdapter:
    def stream_conversations(self, file_path: Path) -> Iterator[Conversation]:
        pass

# ❌ WRONG: Stateful
class ClaudeAdapter:
    def __init__(self, file_path: Path):  # NO!
        self.file_path = file_path

Memory Efficiency

Always use streaming (generators, not lists):

# ✅ CORRECT: Generator
def stream_conversations(self, file_path: Path) -> Iterator[Conversation]:
    for item in parser:
        yield conversation

# ❌ WRONG: List (loads entire file)
def stream_conversations(self, file_path: Path) -> list[Conversation]:
    return [conversation for item in parser]

Error Handling

  • Fail fast on unrecoverable errors (file not found, unsupported version)
  • Graceful degradation on data errors (skip malformed entries)

Progress Reporting

Invoke callbacks every 100 items OR 100ms (whichever comes first):

import time

last_progress_time = time.monotonic()
count = 0

for item in parser:
    count += 1
    current_time = time.monotonic()

    if progress_callback and (count % 100 == 0 or current_time - last_progress_time >= 0.1):
        progress_callback(count)
        last_progress_time = current_time

Protocol Benefits

  1. Type Safety: mypy validates adapter implementations
  2. Interchangeability: Swap providers without code changes
  3. Testability: Mock adapters for testing
  4. Documentation: Self-documenting interface

See Also