Internal Architecture
This document provides a deep dive into AgentiBridge’s internal modules and implementation patterns.
Key Modules
| Module | Purpose | Key Functions/Classes |
|---|---|---|
server.py | FastMCP server with 16 tools | tool handlers, main() |
parser.py | Pure-function JSONL transcript parser | parse_transcript_entries(), scan_projects_dir() |
store.py | SessionStore (Redis + filesystem fallback) | SessionStore, get_session_meta(), list_sessions() |
collector.py | Background polling daemon | SessionCollector, collect_once() |
transport.py | SSE/HTTP transport + API key auth + OAuth | run_sse_server(), auth middleware |
oauth_provider.py | OAuth 2.1 authorization server (opt-in) | BridgeOAuthProvider |
embeddings.py | Semantic search (Phase 2) | TranscriptEmbedder, search_semantic() |
dispatch.py | Session restore + task dispatch (Phase 4) | restore_session_context(), dispatch_task() |
dispatch_bridge.py | Host-side HTTP bridge for Docker dispatch | GET /health, POST /dispatch |
claude_runner.py | Claude CLI subprocess wrapper | run_claude(), ClaudeResult |
llm_client.py | OpenAI-compatible embeddings + chat | embed_text(), chat_completion() |
redis_client.py | Redis helper | get_redis(), redis_key() |
pg_client.py | Postgres + pgvector connection | get_pg(), auto-schema creation |
config.py | Centralized env-var configuration | module-level constants |
cli.py | CLI helper tool | run, stop, status, tunnel, bridge, locks |
catalog.py | Knowledge catalog: memory, plans, history | scan_memory_files(), scan_plans_dir(), parse_history() |
logging.py | Structured JSON logging | log() |
Redis + File Fallback Pattern
All stateful operations follow a consistent fallback pattern to ensure reliability even if Redis is unavailable:
Pattern Overview
# 1. Try Redis first
redis_client = get_redis()
if redis_client:
try:
result = redis_client.get(f"{KEY_PREFIX}:sb:{key}")
if result:
return json.loads(result)
except Exception as e:
logger.warning(f"Redis error, falling back to filesystem: {e}")
# 2. Fall back to filesystem
return read_from_jsonl_file(path)
Key Characteristics
- Graceful degradation: If Redis is down, the bridge continues working with direct file reads
- No partial failures: Either operation succeeds completely or falls back
- Namespaced keys: All Redis keys use
{REDIS_KEY_PREFIX}:sb:{suffix}format - Idempotent operations: Safe to retry without side effects
Redis Key Schema
agentibridge:sb:idx:all # Sorted set of all session IDs (score = last_update)
agentibridge:sb:idx:project:{encoded} # Sorted set of session IDs per project
agentibridge:sb:session:{id}:meta # Hash of session metadata fields
agentibridge:sb:session:{id}:entries # List of JSON-serialized entries (capped at MAX_ENTRIES)
agentibridge:sb:pos:{filepath_hash} # Byte offset for incremental transcript reading
# Phase 5 — Knowledge Catalog
agentibridge:sb:memory:{project}:{filename} # Hash of memory file metadata + content
agentibridge:sb:idx:memory # Sorted set of all memory file keys (score = mtime)
agentibridge:sb:plan:{codename} # Hash of plan metadata + content
agentibridge:sb:plan:{codename}:agents # List of agent subplan codenames
agentibridge:sb:idx:plans # Sorted set of all plan codenames (score = mtime)
agentibridge:sb:codename:{slug} # Set of session IDs linked to a plan codename
agentibridge:sb:history # List of JSON-serialized history entries
agentibridge:sb:pos:history # Byte offset for incremental history.jsonl parsing
When Redis is Used
- list_sessions: Fast ID enumeration (
SMEMBERS sessions) - get_session: Quick metadata lookup (
HGETALL session:{id}) - Collector: Locks prevent concurrent processing of same project
- Transcript caching: Avoids re-parsing large JSONL files on every request
When Filesystem is Used
- Redis unavailable: All operations fall back to direct file reads
- Segment queries: Time-range filters read directly from JSONL (no caching benefit)
- Full transcript: If
MAX_ENTRIES=0or not in Redis, reads from file
Transcript Format
File Location
Raw transcripts are stored in: ~/.claude/projects/{path-encoded}/{session-id}.jsonl
Path encoding example:
- Project path:
/home/user/dev/myproject - Encoded name:
-home-user-dev-myproject - Full path:
~/.claude/projects/-home-user-dev-myproject/
Entry Types
Each line in the JSONL file is a JSON object with a type field:
Indexed types:
user— User input (prompts, commands)assistant— Assistant responses (text, tool calls)summary— Session summary metadatasystem— System messages (hooks, errors)
Filtered types (not indexed):
queue-operation— Internal task queue eventsfile-history-snapshot— File state snapshotsprogress— Progress indicators
Entry Structure
{
"type": "assistant",
"timestamp": "2026-02-20T12:34:56.789Z",
"content": "Let me help you with that...",
"tool_calls": [
{
"name": "Read",
"parameters": {"file_path": "/path/to/file.py"}
}
]
}
Parsing Logic
The parser.py module provides pure functions for incremental parsing:
# Scan all projects under ~/.claude/projects/
sessions = scan_projects_dir(projects_dir)
# Parse new entries starting from a byte offset (incremental)
entries, new_offset = parse_transcript_entries(transcript_path, offset=last_offset)
# Extract session metadata (git branch, cwd, counts, etc.)
meta = parse_transcript_meta(transcript_path)
Indexed entry types: user, assistant, summary, system. Skipped types: progress, queue-operation, file-history-snapshot.
Collector Daemon
Polling Loop
┌─────────────────────────────┐
│ Every POLL_INTERVAL seconds │
└──────────┬──────────────────┘
│
▼
┌──────────────┐
│ Scan projects│
│ directory │
└──────┬───────┘
│
▼
┌──────────────────┐
│ For each project:│
│ - Acquire lock │
│ - Find new data │
│ - Parse & index │
│ - Release lock │
└──────────────────┘
Lock Mechanism
Uses Redis locks to prevent concurrent indexing:
lock_key = f"{KEY_PREFIX}:sb:lock:collect:{project_hash}"
if redis.set(lock_key, "1", nx=True, ex=300): # 5-minute lock
try:
collect_project(project_path)
finally:
redis.delete(lock_key)
Without Redis, collection skips the lock and proceeds directly (no concurrent protection).
Incremental Updates
Tracks last-processed byte offset per transcript file:
position_key = f"{KEY_PREFIX}:sb:pos:{hash(filepath)}"
last_offset = int(redis.get(position_key) or 0)
entries, new_offset = parse_transcript_entries(filepath, offset=last_offset)
redis.set(position_key, new_offset)
Without Redis, positions are stored under ~/.cache/agentibridge/positions/.
Transport Layer (Phase 3)
stdio Transport
For local MCP clients (Claude Code CLI):
# Reads from stdin, writes to stdout
# Used when AGENTIBRIDGE_TRANSPORT=stdio
stdin -> MCP request -> process -> MCP response -> stdout
HTTP/SSE Transport
For remote MCP clients (ChatGPT, Claude Web, etc.):
GET /health -> {"status": "ok"} (public)
POST /mcp -> Streamable HTTP (preferred)
GET /sse -> Server-Sent Events (legacy)
GET /.well-known/oauth-authorization-server -> OAuth metadata (if OAuth enabled)
POST /token, /authorize, /register, /revoke -> OAuth 2.1 endpoints (if OAuth enabled)
Authentication options:
- API key:
X-API-Key: your-keyheader or?api_key=your-keyquery param - OAuth 2.1: Bearer token via
Authorization: Bearer <token>(enabled byOAUTH_ISSUER_URL)
Embedding Pipeline (Phase 2)
Vector Storage
1. Transcript entry (text)
↓
2. LLM API (OpenAI-compatible)
↓ embed()
3. Vector (e.g., 1536 dimensions)
↓
4. PostgreSQL + pgvector
↓ similarity search
5. Ranked results
Schema
CREATE TABLE IF NOT EXISTS transcript_chunks (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
chunk_idx INTEGER NOT NULL,
project TEXT NOT NULL DEFAULT '',
project_encoded TEXT NOT NULL DEFAULT '',
timestamp TEXT NOT NULL DEFAULT '',
text_preview TEXT NOT NULL DEFAULT '',
embedding vector(1536),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (session_id, chunk_idx)
);
CREATE INDEX idx_tc_embedding_hnsw ON transcript_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Search Query
def search_semantic(query: str, limit: int = 10) -> list[dict]:
query_vector = llm_client.embed_text(query)
results = pg.execute("""
SELECT session_id, text_preview,
1 - (embedding <=> %s::vector) AS similarity
FROM transcript_chunks
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_vector, query_vector, limit))
return results
Dispatch Architecture (Phase 4)
Session Restore
restore_session_context(session_id, last_n) loads recent turns from a past session and formats them as a text block for injection into a new prompt. Returns a dict with the formatted context string and char_count.
Task Dispatch
dispatch_task(...) is fully async and fire-and-forget:
- Writes job state to
/tmp/agentibridge_jobs/{job_id}.jsonimmediately - Starts an
asyncio.create_task()— returnsjob_idto the caller - Background task calls
run_claude()(local subprocess or HTTP bridge) - On completion, updates the job file with output, exit_code, duration_ms
Clients poll with get_dispatch_job(job_id) until status is completed or failed.
Dispatch modes:
- Local:
CLAUDE_DISPATCH_URLis empty → runsclaudesubprocess directly - Bridge:
CLAUDE_DISPATCH_URLis set → HTTP POST todispatch_bridge.pyon the host
Knowledge Catalog (Phase 5)
Data Sources
Phase 5 exposes three knowledge categories from Claude Code’s local filesystem:
| Source | Location | Format |
|---|---|---|
| Memory files | ~/.claude/projects/{project}/memory/*.md | Markdown files with curated project knowledge |
| Plans | ~/.claude/plans/*.md | Implementation blueprints linked to sessions via codename/slug |
| History | ~/.claude/history.jsonl | Every user prompt across all sessions with timestamps |
Agent Plans
Plans with the suffix -agent-{hex_hash} are subplans created by agent subprocesses. They’re linked to their parent plan by stripping the suffix:
moonlit-rolling-reddy.md— parent planmoonlit-rolling-reddy-agent-a1b2c3.md— agent subplan
When include_agent_plans=True, get_plan returns the parent content plus all linked agent subplans.
Incremental History Parsing
The parse_history() function uses byte-offset tracking to avoid re-reading the entire history.jsonl on every collection cycle:
- Seek to the last known byte offset
- Detect if offset is at a line boundary (peek at byte before offset)
- If mid-line, skip the partial line remainder
- Read and parse new complete lines
- Return new entries + updated byte offset
Collector Integration
collect_once() runs 3 additional scan passes after transcript indexing:
- Memory scan:
scan_memory_files()finds*.mdfiles in each project’smemory/dir - Plans scan:
scan_plans_dir()reads~/.claude/plans/, resolves session IDs via codename index - History scan:
parse_history()incrementally reads new entries fromhistory.jsonl
Error Handling Patterns
Graceful Fallbacks
# 1. Redis unavailable? Use filesystem
# 2. Anthropic API down? Use LLM_CHAT_MODEL
# 3. Postgres down? Disable semantic search
# 4. Lock acquisition fails? Skip (will retry next cycle)
Structured Errors
class AgentiBridgeError(Exception):
"""Base exception with structured context."""
def __init__(self, message: str, context: dict | None = None):
self.message = message
self.context = context or {}
super().__init__(message)
# Usage
raise SessionNotFoundError(
"Session not found",
context={"session_id": session_id, "project": project_path}
)
Performance Characteristics
Latency Targets
list_sessions: < 100ms (Redis) or < 500ms (filesystem)get_session: < 50ms (cached) or < 200ms (uncached)search_sessions: < 500ms (keyword) or < 2s (semantic)collect_now: 1-5s (depends on transcript size)
Memory Usage
- Redis: ~1KB per session metadata, ~100KB per cached transcript
- Collector: ~50MB baseline + ~1MB per 1000 transcript entries
- Embeddings: ~6KB per vector (1536 dims * 4 bytes)
Scalability Limits
- Sessions: Tested with 10,000+ sessions
- Transcripts: Individual files up to 10MB (5,000+ entries)
- Concurrent requests: 100+ (SSE transport)
Development Patterns
Adding a New Tool
- Add handler in
server.py:@mcp.tool() async def my_new_tool(arg: str) -> dict: """Tool description for MCP registry.""" result = await store.do_something(arg) return {"result": result} - Update
store.pywith business logic - Add tests in
tests/unit/test_server.py - Update documentation
Adding Configuration
- Add to
config.py:MY_NEW_VAR: str = os.getenv("MY_NEW_VAR", "default") - Add validation in
Config.__post_init__() - Update
docs/reference/configuration.md - Add to
.env.examplegeneration in CLI