Skip to content

OpenAI Adapter

Adapter for parsing ChatGPT conversation exports.

Overview

The OpenAIAdapter implements the ConversationProvider protocol for OpenAI (ChatGPT) conversation exports. It provides stateless, streaming-based parsing with O(1) memory usage.

API Reference

OpenAIAdapter

Adapter for streaming OpenAI conversation exports.

This adapter uses ijson to stream-parse OpenAI ChatGPT export files with O(1) memory complexity. Conversations are yielded one at a time, enabling processing of arbitrarily large export files on modest hardware.

Memory Model
  • Streaming parser state: ~10-50MB (ijson buffer)
  • Per-conversation overhead: ~5MB (metadata + message tree)
  • Total working set: <100MB regardless of file size
Error Handling Strategy
  • FileNotFoundError: Raised for missing files (fail-fast)
  • ParseError: Raised for invalid JSON syntax (fail-fast)
  • ValidationError: Raised for schema violations (fail-fast during streaming)
  • Malformed conversations: Logged and skipped (graceful degradation)
Example
from pathlib import Path
from echomine.adapters import OpenAIAdapter

adapter = OpenAIAdapter()

# Stream all conversations (lazy iteration)
for conversation in adapter.stream_conversations(Path("export.json")):
    print(f"{conversation.title}: {conversation.message_count} messages")

# Process first N conversations only (memory-efficient)
conversations = []
for i, conv in enumerate(adapter.stream_conversations(Path("export.json"))):
    conversations.append(conv)
    if i >= 9:  # First 10 conversations
        break
Requirements
  • FR-003: O(1) memory streaming implementation
  • FR-018: Extract conversation metadata (id, title, timestamps)
  • FR-122: Use ijson for incremental JSON parsing
  • FR-281-285: Skip malformed entries with warning logs
  • SC-001: Memory usage <1GB for large exports

stream_conversations

stream_conversations(
    file_path: Path,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[Conversation]

Stream conversations from OpenAI export file with O(1) memory.

This method uses ijson to incrementally parse the export file, yielding Conversation objects one at a time. The entire file is NEVER loaded into memory - only the current conversation being parsed.

Streaming Behavior
  • Returns iterator (lazy evaluation)
  • Conversations yielded in file order
  • Parser state bounded by ijson buffer (~50MB)
  • No buffering between conversations
Error Handling
  • Invalid JSON: Raises ParseError immediately
  • Missing file: Raises FileNotFoundError
  • Schema violations: Raises ValidationError (Pydantic)
  • Empty array: Succeeds, yields zero conversations

Parameters:

Name Type Description Default
file_path Path

Path to OpenAI export JSON file

required
progress_callback ProgressCallback | None

Optional callback invoked every 100 conversations (FR-069)

None
on_skip OnSkipCallback | None

Optional callback invoked when malformed entries skipped (FR-107)

None

Yields:

Type Description
Conversation

Conversation objects parsed from export

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ParseError

If JSON is malformed (syntax errors)

ValidationError

If conversation data violates schema

Example
# Basic usage
adapter = OpenAIAdapter()
for conv in adapter.stream_conversations(Path("export.json")):
    print(f"Conversation: {conv.title}")

# Handle errors
try:
    conversations = list(adapter.stream_conversations(path))
except ParseError as e:
    print(f"Invalid export format: {e}")
except ValidationError as e:
    print(f"Schema violation: {e}")

Memory Complexity: O(1) for file size, O(N) for single conversation Time Complexity: O(M) where M = total conversations in file

Source code in src/echomine/adapters/openai.py
def stream_conversations(
    self,
    file_path: Path,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[Conversation]:
    """Stream conversations from OpenAI export file with O(1) memory.

    This method uses ijson to incrementally parse the export file, yielding
    Conversation objects one at a time. The entire file is NEVER loaded into
    memory - only the current conversation being parsed.

    Streaming Behavior:
        - Returns iterator (lazy evaluation)
        - Conversations yielded in file order
        - Parser state bounded by ijson buffer (~50MB)
        - No buffering between conversations

    Error Handling:
        - Invalid JSON: Raises ParseError immediately
        - Missing file: Raises FileNotFoundError
        - Schema violations: Raises ValidationError (Pydantic)
        - Empty array: Succeeds, yields zero conversations

    Args:
        file_path: Path to OpenAI export JSON file
        progress_callback: Optional callback invoked every 100 conversations (FR-069)
        on_skip: Optional callback invoked when malformed entries skipped (FR-107)

    Yields:
        Conversation objects parsed from export

    Raises:
        FileNotFoundError: If file doesn't exist
        ParseError: If JSON is malformed (syntax errors)
        ValidationError: If conversation data violates schema

    Example:
        ```python
        # Basic usage
        adapter = OpenAIAdapter()
        for conv in adapter.stream_conversations(Path("export.json")):
            print(f"Conversation: {conv.title}")

        # Handle errors
        try:
            conversations = list(adapter.stream_conversations(path))
        except ParseError as e:
            print(f"Invalid export format: {e}")
        except ValidationError as e:
            print(f"Schema violation: {e}")
        ```

    Memory Complexity: O(1) for file size, O(N) for single conversation
    Time Complexity: O(M) where M = total conversations in file
    """
    # Open file in binary mode for ijson (required for streaming)
    # FileNotFoundError raised naturally by open() if file missing
    try:
        with open(file_path, "rb") as f:
            # Stream top-level array items using ijson
            # Memory: O(1) - ijson maintains bounded buffer
            # Each "item" is a complete conversation object
            try:
                items = ijson.items(f, "item")
                count = 0  # Track for progress_callback (FR-069)

                for raw_conversation in items:
                    # Parse individual conversation
                    # Memory: O(N) where N = messages in this conversation
                    try:
                        conversation = self._parse_conversation(raw_conversation)
                        count += 1

                        # Invoke progress callback every 100 items (FR-069)
                        if progress_callback and count % 100 == 0:
                            progress_callback(count)

                        yield conversation
                    except PydanticValidationError as e:
                        # Graceful degradation: skip malformed entries (FR-281)
                        conversation_id = raw_conversation.get("id", "unknown")
                        reason = f"Validation error: {e}"

                        # Invoke on_skip callback if provided (FR-107)
                        if on_skip:
                            on_skip(conversation_id, reason)

                        # Log warning but continue processing (FR-281)
                        logger.warning(
                            "Skipped malformed conversation",
                            extra={
                                "conversation_id": conversation_id,
                                "reason": reason,
                            },
                        )
                        continue  # Skip this conversation, process next

            except ijson.JSONError as e:
                # ijson.JSONError raised for malformed JSON
                # Convert to our ParseError for consistent error handling (FR-039, FR-041)
                raise ParseError(
                    f"JSON parsing failed: {e}. "
                    f"Verify export file '{file_path}' is valid JSON from OpenAI ChatGPT."
                ) from e

    except FileNotFoundError:
        # Re-raise FileNotFoundError without wrapping
        # This is a standard Python exception, no conversion needed
        raise

search

search(
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[SearchResult[Conversation]]

Search conversations with BM25 relevance ranking.

Algorithm: 1. Stream all conversations (O(1) memory per conversation) 2. Apply title filter if specified (metadata-only, fast) 3. Apply date range filter if specified 4. Build corpus and calculate BM25 scores 5. Rank by relevance (descending) 6. Apply limit if specified 7. Yield SearchResult objects one at a time

Parameters:

Name Type Description Default
file_path Path

Path to OpenAI export file

required
query SearchQuery

SearchQuery with keywords, title_filter, limit

required
progress_callback ProgressCallback | None

Optional callback invoked per conversation processed

None
on_skip OnSkipCallback | None

Optional callback for malformed entries

None

Yields:

Type Description
SearchResult[Conversation]

SearchResult[Conversation] with ranked results and scores

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ParseError

If JSON is malformed

Performance
  • Memory: O(N) where N = matching conversations
  • Time: O(M) where M = total conversations in file
  • Early termination: Not implemented (stream all for BM25)
Example
adapter = OpenAIAdapter()
query = SearchQuery(keywords=["python"], limit=10)

for result in adapter.search(Path("export.json"), query):
    print(f"{result.score:.2f}: {result.conversation.title}")
Source code in src/echomine/adapters/openai.py
def search(
    self,
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: ProgressCallback | None = None,
    on_skip: OnSkipCallback | None = None,
) -> Iterator[SearchResult[Conversation]]:
    """Search conversations with BM25 relevance ranking.

    Algorithm:
    1. Stream all conversations (O(1) memory per conversation)
    2. Apply title filter if specified (metadata-only, fast)
    3. Apply date range filter if specified
    4. Build corpus and calculate BM25 scores
    5. Rank by relevance (descending)
    6. Apply limit if specified
    7. Yield SearchResult objects one at a time

    Args:
        file_path: Path to OpenAI export file
        query: SearchQuery with keywords, title_filter, limit
        progress_callback: Optional callback invoked per conversation processed
        on_skip: Optional callback for malformed entries

    Yields:
        SearchResult[Conversation] with ranked results and scores

    Raises:
        FileNotFoundError: If file doesn't exist
        ParseError: If JSON is malformed

    Performance:
        - Memory: O(N) where N = matching conversations
        - Time: O(M) where M = total conversations in file
        - Early termination: Not implemented (stream all for BM25)

    Example:
        ```python
        adapter = OpenAIAdapter()
        query = SearchQuery(keywords=["python"], limit=10)

        for result in adapter.search(Path("export.json"), query):
            print(f"{result.score:.2f}: {result.conversation.title}")
        ```
    """
    # Stream conversations and apply filters
    # Type: (conversation, filtered_messages) for snippet extraction
    conversations: list[tuple[Conversation, list[Message]]] = []
    corpus_texts: list[str] = []

    count = 0
    for conv in self.stream_conversations(file_path):
        count += 1

        # Progress callback (every 100 items per FR-069)
        if progress_callback and count % 100 == 0:
            progress_callback(count)

        # Title filter (fast metadata check)
        if query.has_title_filter():
            assert query.title_filter is not None  # Type narrowing
            if query.title_filter.lower() not in conv.title.lower():
                continue  # Skip non-matching titles

        # Date range filter
        if query.has_date_filter():
            conv_date = conv.created_at.date()

            # Check from_date (inclusive)
            if query.from_date is not None and conv_date < query.from_date:
                continue

            # Check to_date (inclusive)
            if query.to_date is not None and conv_date > query.to_date:
                continue

        # FR-006: Message count filter (streaming approach for O(1) memory)
        if query.has_message_count_filter():
            msg_count = conv.message_count

            # Check min_messages (inclusive)
            if query.min_messages is not None and msg_count < query.min_messages:
                continue

            # Check max_messages (inclusive)
            if query.max_messages is not None and msg_count > query.max_messages:
                continue

        # FR-018: Filter messages by role before text aggregation
        if query.role_filter is not None:
            filtered_messages = [m for m in conv.messages if m.role == query.role_filter]
        else:
            filtered_messages = list(conv.messages)

        # Skip conversations with no messages matching the role filter
        if query.role_filter is not None and not filtered_messages:
            continue

        # Build corpus text
        # When role_filter is set, only search in filtered message content (not title)
        # When role_filter is None, include title for metadata-based matching
        if query.role_filter is not None:
            conv_text = " ".join(m.content for m in filtered_messages)
        else:
            conv_text = f"{conv.title} " + " ".join(m.content for m in filtered_messages)

        conversations.append((conv, filtered_messages))
        corpus_texts.append(conv_text)

    # Final progress callback
    if progress_callback:
        progress_callback(count)

    # Handle empty results
    if not conversations:
        return  # Empty iterator

    # Calculate average document length for BM25
    # Use regex tokenization to match BM25Scorer's tokenization
    import re

    def tokenize_for_length(text: str) -> int:
        """Count tokens using same method as BM25Scorer."""
        text_lower = text.lower()
        count = len(re.findall(r"[a-z0-9]+", text_lower))
        count += len(re.findall(r"[^\W\d_a-z]", text_lower))
        return count

    avg_doc_length = sum(tokenize_for_length(text) for text in corpus_texts) / len(corpus_texts)

    # Initialize BM25 scorer
    scorer = BM25Scorer(corpus=corpus_texts, avg_doc_length=avg_doc_length)

    # Score all conversations
    # Type: (conversation, score, matched_message_ids, filtered_messages)
    scored_conversations: list[tuple[Conversation, float, list[str], list[Message]]] = []

    for (conv, filtered_msgs), conv_text in zip(conversations, corpus_texts):
        score = 0.0
        matched_message_ids: list[str] = []
        has_keyword_match = False
        has_phrase_match = False

        # Check keyword matches (BM25 scoring)
        if query.has_keyword_search():
            assert query.keywords is not None  # Type narrowing

            # FR-009: match_mode='all' requires ALL keywords present
            if query.match_mode == "all":
                if all_terms_present(conv_text, query.keywords, scorer):
                    # All keywords present - calculate score
                    score = scorer.score(conv_text, query.keywords)
                    matched_message_ids = self._find_matched_messages(
                        filtered_msgs, query.keywords
                    )
                    has_keyword_match = True
                # else: keywords don't all match, but may still match phrases (checked below)
            else:
                # Default 'any' mode: regular BM25 scoring
                score = scorer.score(conv_text, query.keywords)
                matched_message_ids = self._find_matched_messages(filtered_msgs, query.keywords)
                if score > 0.0:
                    has_keyword_match = True

        # Check phrase matches (exact substring matching)
        # FR-002: Multiple phrases use OR logic
        # FR-004: Phrases can be combined with keywords (OR logic)
        if query.has_phrase_search():
            assert query.phrases is not None  # Type narrowing
            if phrase_matches(conv_text, query.phrases):
                has_phrase_match = True
                # If phrase matches but no keyword score, use 1.0
                if score == 0.0:
                    score = 1.0
                # Find messages that match the phrases (from filtered messages only)
                for message in filtered_msgs:
                    if phrase_matches(message.content, query.phrases):
                        if message.id not in matched_message_ids:
                            matched_message_ids.append(message.id)

        # Skip conversations with no matches (neither keyword nor phrase)
        if not has_keyword_match and not has_phrase_match:
            # If no keywords or phrases specified, include all (title/date filter only)
            if not query.has_keyword_search() and not query.has_phrase_search():
                score = 1.0
            else:
                continue

        # FR-014: Apply exclude filter after matching, before ranking
        if query.has_exclude_keywords():
            assert query.exclude_keywords is not None  # Type narrowing
            if exclude_filter(conv_text, query.exclude_keywords, scorer):
                continue  # Skip conversations containing excluded terms

        scored_conversations.append((conv, score, matched_message_ids, filtered_msgs))

    # Handle no results after filtering
    if not scored_conversations:
        return  # Empty iterator

    # Sort conversations based on query parameters (FR-043-048)
    # FR-043a: Tie-breaking by conversation_id (ascending, lexicographic)
    # FR-043b: Stable sort (Python's sort() is stable by default)
    def get_sort_key(
        item: tuple[Conversation, float, list[str], list[Message]],
    ) -> tuple[float | str | int, str]:
        """Get sort key based on query sort_by parameter.

        Returns tuple for multi-level sorting:
        - Primary: sort_by field value
        - Secondary: conversation_id (tie-breaker, FR-043a)

        FR-046a: For date sort, use updated_at or fall back to created_at if None
        FR-047: Title sort is case-insensitive
        """
        conv, score, _, _ = item

        primary_key: float | str | int
        if query.sort_by == "score":
            # Sort by BM25 relevance score
            primary_key = score
        elif query.sort_by == "date":
            # FR-046a: Use updated_at if present, otherwise created_at
            sort_date = conv.updated_at if conv.updated_at is not None else conv.created_at
            # Convert datetime to timestamp for numeric sorting
            primary_key = sort_date.timestamp()
        elif query.sort_by == "title":
            # FR-047: Case-insensitive title sort
            primary_key = conv.title.lower()
        else:  # query.sort_by == "messages"
            # Sort by message count
            primary_key = conv.message_count

        # FR-043a: Tie-breaking by conversation_id (ascending)
        return (primary_key, conv.id)

    # Apply sorting with reverse based on sort_order (FR-044)
    reverse_sort = query.sort_order == "desc"
    scored_conversations.sort(key=get_sort_key, reverse=reverse_sort)

    # Normalize scores to [0.0, 1.0] range using BM25 normalization formula (FR-319)
    # Formula: score_normalized = score_raw / (score_raw + 1)
    # This ensures consistent score interpretation across queries
    scored_conversations = [
        (conv, score / (score + 1.0) if score > 0 else 0.0, msg_ids, msgs)
        for conv, score, msg_ids, msgs in scored_conversations
    ]

    # Apply limit (always positive integer per SearchQuery validation)
    scored_conversations = scored_conversations[: query.limit]

    # Yield SearchResult objects with snippet extraction (FR-021-025)
    for conv, score, matched_message_ids, filtered_msgs in scored_conversations:
        # Build keywords list for snippet extraction
        snippet_keywords: list[str] = []
        if query.keywords:
            snippet_keywords.extend(query.keywords)
        if query.phrases:
            snippet_keywords.extend(query.phrases)

        # Extract snippet from matched messages
        snippet, _ = extract_snippet_from_messages(
            filtered_msgs,
            snippet_keywords,
            matched_message_ids,
        )

        yield SearchResult[Conversation](
            conversation=conv,
            score=score,
            matched_message_ids=matched_message_ids,
            snippet=snippet,
        )

get_conversation_by_id

get_conversation_by_id(
    file_path: Path, conversation_id: str
) -> Conversation | None

Retrieve specific conversation by UUID (FR-155, FR-217, FR-356).

Uses streaming search for memory efficiency - O(N) time, O(1) memory. For large files with frequent ID lookups, consider building an index.

Parameters:

Name Type Description Default
file_path Path

Path to OpenAI export JSON file

required
conversation_id str

UUID of conversation to retrieve

required

Returns:

Type Description
Conversation | None

Conversation object if found, None otherwise (FR-155)

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ParseError

If JSON is malformed

Example
adapter = OpenAIAdapter()
conv = adapter.get_conversation_by_id(
    Path("export.json"),
    "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
)
if conv:
    print(f"Found: {conv.title}")
else:
    print("Conversation not found")
Performance
  • Time: O(N) where N = conversations in file (streaming search)
  • Memory: O(1) for file size, O(M) for single conversation
  • Early termination: Returns immediately when match found
Source code in src/echomine/adapters/openai.py
def get_conversation_by_id(
    self,
    file_path: Path,
    conversation_id: str,
) -> Conversation | None:
    """Retrieve specific conversation by UUID (FR-155, FR-217, FR-356).

    Uses streaming search for memory efficiency - O(N) time, O(1) memory.
    For large files with frequent ID lookups, consider building an index.

    Args:
        file_path: Path to OpenAI export JSON file
        conversation_id: UUID of conversation to retrieve

    Returns:
        Conversation object if found, None otherwise (FR-155)

    Raises:
        FileNotFoundError: If file doesn't exist
        ParseError: If JSON is malformed

    Example:
        ```python
        adapter = OpenAIAdapter()
        conv = adapter.get_conversation_by_id(
            Path("export.json"),
            "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
        )
        if conv:
            print(f"Found: {conv.title}")
        else:
            print("Conversation not found")
        ```

    Performance:
        - Time: O(N) where N = conversations in file (streaming search)
        - Memory: O(1) for file size, O(M) for single conversation
        - Early termination: Returns immediately when match found
    """
    # Stream conversations and return first match
    for conversation in self.stream_conversations(file_path):
        if conversation.id == conversation_id:
            return conversation

    # Not found - return None per FR-155
    return None

get_message_by_id

get_message_by_id(
    file_path: Path, message_id: str, *, conversation_id: str | None = None
) -> tuple[Message, Conversation] | None

Retrieve specific message by UUID with parent conversation context.

Searches for a message by ID, optionally scoped to a specific conversation for performance optimization. Returns both the message and its parent conversation to provide full context.

Uses streaming search for memory efficiency - O(1) memory usage.

Parameters:

Name Type Description Default
file_path Path

Path to OpenAI export JSON file

required
message_id str

UUID of message to retrieve

required
conversation_id str | None

Optional conversation UUID to scope search (performance hint)

None

Returns:

Type Description
tuple[Message, Conversation] | None

Tuple of (Message, Conversation) if found, None otherwise.

tuple[Message, Conversation] | None

The Conversation is the parent containing the message.

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ParseError

If JSON is malformed

Example
adapter = OpenAIAdapter()

# Search with conversation hint (faster)
result = adapter.get_message_by_id(
    Path("export.json"),
    "msg-123",
    conversation_id="conv-456"
)

# Search all conversations (slower)
result = adapter.get_message_by_id(
    Path("export.json"),
    "msg-123"
)

if result:
    message, conversation = result
    print(f"Message: {message.content}")
    print(f"From conversation: {conversation.title}")
else:
    print("Message not found")
Performance
  • With conversation_id:
    • Time: O(N) where N = conversations until match
    • Memory: O(1) for file size, O(M) for single conversation
  • Without conversation_id:
    • Time: O(N*M) where N = conversations, M = messages per conversation
    • Memory: O(1) for file size, O(M) for single conversation
  • Early termination: Returns immediately when match found
Design Notes

Returns tuple instead of just Message to provide conversation context (title, timestamps, other messages) which is valuable for CLI display and analysis workflows.

Source code in src/echomine/adapters/openai.py
def get_message_by_id(
    self,
    file_path: Path,
    message_id: str,
    *,
    conversation_id: str | None = None,
) -> tuple[Message, Conversation] | None:
    """Retrieve specific message by UUID with parent conversation context.

    Searches for a message by ID, optionally scoped to a specific conversation
    for performance optimization. Returns both the message and its parent
    conversation to provide full context.

    Uses streaming search for memory efficiency - O(1) memory usage.

    Args:
        file_path: Path to OpenAI export JSON file
        message_id: UUID of message to retrieve
        conversation_id: Optional conversation UUID to scope search (performance hint)

    Returns:
        Tuple of (Message, Conversation) if found, None otherwise.
        The Conversation is the parent containing the message.

    Raises:
        FileNotFoundError: If file doesn't exist
        ParseError: If JSON is malformed

    Example:
        ```python
        adapter = OpenAIAdapter()

        # Search with conversation hint (faster)
        result = adapter.get_message_by_id(
            Path("export.json"),
            "msg-123",
            conversation_id="conv-456"
        )

        # Search all conversations (slower)
        result = adapter.get_message_by_id(
            Path("export.json"),
            "msg-123"
        )

        if result:
            message, conversation = result
            print(f"Message: {message.content}")
            print(f"From conversation: {conversation.title}")
        else:
            print("Message not found")
        ```

    Performance:
        - With conversation_id:
            - Time: O(N) where N = conversations until match
            - Memory: O(1) for file size, O(M) for single conversation
        - Without conversation_id:
            - Time: O(N*M) where N = conversations, M = messages per conversation
            - Memory: O(1) for file size, O(M) for single conversation
        - Early termination: Returns immediately when match found

    Design Notes:
        Returns tuple instead of just Message to provide conversation context
        (title, timestamps, other messages) which is valuable for CLI display
        and analysis workflows.
    """
    # If conversation_id provided, search only that conversation
    if conversation_id is not None:
        conv = self.get_conversation_by_id(file_path, conversation_id)
        if conv is not None:
            msg = conv.get_message_by_id(message_id)
            if msg is not None:
                return (msg, conv)
        return None

    # Otherwise, stream all conversations and search each
    for conv in self.stream_conversations(file_path):
        msg = conv.get_message_by_id(message_id)
        if msg is not None:
            return (msg, conv)

    # Not found in any conversation
    return None

Usage Examples

Basic Setup

from echomine import OpenAIAdapter
from pathlib import Path

# Create adapter (stateless, reusable)
adapter = OpenAIAdapter()
export_file = Path("conversations.json")

Stream All Conversations

Memory-efficient iteration over all conversations:

# Stream conversations (O(1) memory usage)
for conversation in adapter.stream_conversations(export_file):
    print(f"[{conversation.created_at.date()}] {conversation.title}")
    print(f"  Messages: {len(conversation.messages)}")
    print(f"  ID: {conversation.id}")

Search with Keywords

Full-text search with BM25 ranking:

from echomine.models import SearchQuery

# Create search query
query = SearchQuery(
    keywords=["algorithm", "leetcode"],
    limit=10
)

# Execute search
for result in adapter.search(export_file, query):
    print(f"[{result.score:.2f}] {result.conversation.title}")

Get Conversation by ID

Retrieve a specific conversation:

# Get specific conversation
conversation = adapter.get_conversation_by_id(export_file, "conv-abc123")

if conversation:
    print(f"Found: {conversation.title}")
    print(f"Messages: {len(conversation.messages)}")
else:
    print("Conversation not found")

Progress Reporting

Track progress for long-running operations:

def progress_callback(count: int) -> None:
    """Called periodically during processing."""
    if count % 100 == 0:
        print(f"Processed {count:,} conversations...")

# Stream with progress reporting
for conversation in adapter.stream_conversations(
    export_file,
    progress_callback=progress_callback
):
    process(conversation)

Graceful Degradation

Handle malformed entries gracefully:

skipped_entries = []

def handle_skipped(conversation_id: str, reason: str) -> None:
    """Called when malformed entry is skipped."""
    skipped_entries.append({
        "id": conversation_id,
        "reason": reason,
    })

# Stream with skip handler
for conversation in adapter.stream_conversations(
    export_file,
    on_skip=handle_skipped
):
    process(conversation)

if skipped_entries:
    print(f"Skipped {len(skipped_entries)} malformed conversations")

Methods

stream_conversations()

Stream all conversations from export file.

Signature:

def stream_conversations(
    self,
    file_path: Path,
    *,
    progress_callback: Optional[Callable[[int], None]] = None,
    on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[Conversation]:
    ...

Parameters:

  • file_path: Path to OpenAI export JSON file
  • progress_callback: Optional callback invoked periodically with conversation count
  • on_skip: Optional callback invoked when malformed entry is skipped

Returns:

Iterator yielding Conversation objects.

Raises:

  • FileNotFoundError: If file does not exist
  • PermissionError: If file cannot be read
  • ParseError: If export format is invalid
  • SchemaVersionError: If export schema version is unsupported

Memory Usage: O(1) - constant memory regardless of file size.

Search conversations with BM25 ranking and filtering.

Signature:

def search(
    self,
    file_path: Path,
    query: SearchQuery,
    *,
    progress_callback: Optional[Callable[[int], None]] = None,
    on_skip: Optional[Callable[[str, str], None]] = None,
) -> Iterator[SearchResult[Conversation]]:
    ...

Parameters:

  • file_path: Path to OpenAI export JSON file
  • query: Search parameters (keywords, filters, limit)
  • progress_callback: Optional callback for progress reporting
  • on_skip: Optional callback for skipped entries

Returns:

Iterator yielding SearchResult[Conversation] objects, sorted by relevance score (descending).

Raises:

Same as stream_conversations().

Performance:

  • Title-only search: <5 seconds for 10K conversations (metadata-only)
  • Keyword search: <30 seconds for 1.6GB files (full-text with BM25)

get_conversation_by_id()

Retrieve a specific conversation by ID.

Signature:

def get_conversation_by_id(
    self,
    file_path: Path,
    conversation_id: str,
) -> Optional[Conversation]:
    ...

Parameters:

  • file_path: Path to OpenAI export JSON file
  • conversation_id: ID of conversation to retrieve

Returns:

Conversation if found, None otherwise.

Raises:

Same as stream_conversations().

Performance: Early termination - stops searching after finding conversation.

Adapter Pattern

Stateless Design

OpenAIAdapter has no __init__ parameters and maintains no internal state:

# ✅ CORRECT: Reusable adapter
adapter = OpenAIAdapter()

for file in export_files:
    for conv in adapter.stream_conversations(file):
        process(conv)

Benefits:

  • Thread-safe (no shared state)
  • Reusable across multiple files
  • Simple, predictable behavior

Protocol Implementation

Implements ConversationProvider protocol:

from echomine.protocols import ConversationProvider

# Type-safe adapter usage
def process_export(
    adapter: ConversationProvider,  # Works with ANY adapter
    file_path: Path
) -> None:
    for conv in adapter.stream_conversations(file_path):
        print(conv.title)

# OpenAIAdapter implements protocol
process_export(OpenAIAdapter(), Path("export.json"))

OpenAI-Specific Behavior

Export Format

Expects OpenAI ChatGPT export JSON format:

[
  {
    "id": "conv-uuid",
    "title": "Conversation Title",
    "create_time": 1704974400.0,
    "update_time": 1704974500.0,
    "mapping": {
      "msg-uuid-1": {
        "id": "msg-uuid-1",
        "message": {
          "id": "msg-uuid-1",
          "author": {"role": "user"},
          "content": {"content_type": "text", "parts": ["Hello"]},
          "create_time": 1704974410.0
        },
        "parent": null,
        "children": ["msg-uuid-2"]
      }
    }
  }
]

Metadata Mapping

Provider-specific fields stored in conversation.metadata:

  • openai_model: Model used (e.g., "gpt-4")
  • openai_conversation_template_id: Template ID
  • openai_plugin_ids: List of plugin IDs used
  • openai_moderation_results: Moderation results (if any)

Example:

conversation = adapter.get_conversation_by_id(file_path, "conv-123")
model = conversation.metadata.get("openai_model", "unknown")
print(f"Model: {model}")

Role Normalization

OpenAI roles are already normalized (no mapping needed):

  • "user" → "user"
  • "assistant" → "assistant"
  • "system" → "system"

Error Handling

Exceptions

from echomine import (
    ParseError,          # Malformed JSON/structure
    ValidationError,     # Invalid data
    SchemaVersionError,  # Unsupported version
)

try:
    for conv in adapter.stream_conversations(file_path):
        process(conv)
except ParseError as e:
    print(f"Export file corrupted: {e}")
except SchemaVersionError as e:
    print(f"Unsupported export version: {e}")
except FileNotFoundError:
    print(f"File not found: {file_path}")

Graceful Degradation

Malformed conversations are skipped with warnings logged:

# Skipped entries logged as WARNING
# Processing continues for valid entries
for conv in adapter.stream_conversations(file_path):
    # Only valid conversations yielded
    process(conv)

Concurrency

Thread Safety

  • Adapter instances: Thread-safe (stateless)
  • Iterators: NOT thread-safe (each thread needs its own)
from threading import Thread

adapter = OpenAIAdapter()  # SAFE: Share adapter

def worker(thread_id):
    # SAFE: Each thread creates its own iterator
    for conv in adapter.stream_conversations(file_path):
        process(conv, thread_id)

threads = [Thread(target=worker, args=(i,)) for i in range(4)]

Multi-Process Safety

Multiple processes can read the same file concurrently:

from multiprocessing import Process

def worker(process_id):
    adapter = OpenAIAdapter()  # Each process has its own adapter
    for conv in adapter.stream_conversations(file_path):
        process(conv, process_id)

processes = [Process(target=worker, args=(i,)) for i in range(4)]

Performance

Memory Efficiency

  • O(1) memory usage: Constant memory regardless of file size
  • Streaming: Uses ijson for incremental parsing
  • No buffering: Yields conversations as they're parsed

Speed

  • 10K conversations: <5 seconds for listing (metadata-only)
  • 1.6GB file: <30 seconds for keyword search
  • Early termination: get_conversation_by_id stops after finding match

See Also