Gira MCP Server Architecture¶
This document provides comprehensive technical details about the Gira MCP server architecture, implementation patterns, and development guidelines for contributors and integrators.
๐๏ธ High-Level Architecture¶
System Components¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Claude Desktop / MCP Client โ
โโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ JSON-RPC over stdio
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Gira MCP Server โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ FastMCP โ โ Security โ โ Config โ โ
โ โ Framework โ โ Manager โ โ Manager โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Tool Registry โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Ticket โ โ Epic โ โ Sprint โ โ
โ โ Tools โ โ Tools โ โ Tools โ โ
โ โ (12 tools) โ โ (7 tools) โ โ (8 tools) โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโโโโ โ
โ โ Board โ โ
โ โ Tools โ โ
โ โ (3 tools) โ โ
โ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Core Services โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Validation โ โ Audit โ โ Rate โ โ
โ โ Service โ โ Logger โ โ Limiter โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ File System Operations
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Gira Data Layer โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ .gira/ โ โ Tickets โ โ Epics โ โ
โ โ directory โ โ (JSON) โ โ (JSON) โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Sprints โ โ Board โ โ
โ โ (JSON) โ โ State โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ง Core Framework¶
FastMCP Integration¶
The Gira MCP server is built on the FastMCP framework, providing:
# Core MCP server setup
from fastmcp import FastMCP
# Initialize MCP server with configuration
mcp = FastMCP("gira")
# Tool registration pattern
@mcp.tool(name="list_tickets")
@secure_operation("list_tickets", require_project=True)
@rate_limit(max_calls=50, window_seconds=60)
def list_tickets(
status: Optional[str] = None,
assignee: Optional[str] = None,
epic: Optional[str] = None,
limit: Optional[int] = None
) -> OperationResult:
"""List tickets with optional filtering."""
# Implementation details...
Key FastMCP Features Used: - Automatic JSON-RPC handling for MCP protocol compliance - Type-safe tool definitions with Pydantic models - Built-in error handling with proper MCP error responses - Streaming support for large result sets - Resource management for file and directory operations
Configuration System¶
Configuration Hierarchy: 1. Environment Variables (highest priority) 2. Configuration Files (.gira/mcp-config.json) 3. Default Values (lowest priority)
from pydantic import BaseModel, Field
from typing import Optional, List
class MCPConfig(BaseModel):
"""Comprehensive MCP server configuration."""
# Core settings
working_directory: str = Field(default=".", description="Gira project directory")
dry_run: bool = Field(default=True, description="Enable dry run mode")
# Security settings
require_confirmation: bool = Field(default=False, description="Require confirmation for destructive operations")
blocked_operations: List[str] = Field(default=[], description="List of blocked operation names")
max_input_length: int = Field(default=10000, description="Maximum input string length")
max_list_length: int = Field(default=1000, description="Maximum list input length")
# Audit settings
audit_enabled: bool = Field(default=True, description="Enable audit logging")
verbose_logging: bool = Field(default=False, description="Include parameters in audit logs")
audit_file: str = Field(default="gira-mcp-audit.log", description="Audit log file name")
# Rate limiting (built into tools, not configurable)
# Tool-specific limits defined in security decorators
class Config:
env_prefix = "GIRA_MCP_"
case_sensitive = False
Environment Variable Examples:
# Core configuration
export GIRA_MCP_WORKING_DIRECTORY="/path/to/gira/project"
export GIRA_MCP_DRY_RUN=false
# Security configuration
export GIRA_MCP_REQUIRE_CONFIRMATION=true
export GIRA_MCP_BLOCKED_OPERATIONS="delete_ticket,archive_tickets"
# Audit configuration
export GIRA_MCP_AUDIT_ENABLED=true
export GIRA_MCP_VERBOSE_LOGGING=false
๐ก๏ธ Security Architecture¶
Multi-Layer Security Model¶
Layer 1: Working Directory Validation
class MCPSecurityManager:
def validate_path(self, path: Union[str, Path]) -> Path:
"""Ensure path is within project boundaries."""
try:
path_obj = Path(path)
# Convert relative to absolute within working directory
if not path_obj.is_absolute():
path_obj = self.working_directory / path_obj
resolved_path = path_obj.resolve()
# Prevent directory traversal
try:
resolved_path.relative_to(self.working_directory)
return resolved_path
except ValueError:
raise PermissionError(f"Path '{path}' outside project boundaries")
except (OSError, ValueError) as e:
raise PermissionError(f"Invalid path '{path}': {str(e)}")
Layer 2: Input Validation & Sanitization
def sanitize_input(self, value: Any, field_name: str = "input") -> Any:
"""Comprehensive input sanitization."""
if value is None:
return None
if isinstance(value, str):
# Length validation
if len(value) > self.max_input_length:
raise ValidationError(f"{field_name} exceeds maximum length")
# Remove dangerous characters
sanitized = value.replace('\x00', '').replace('\r\n', '\n')
# Pattern detection (log suspicious patterns)
suspicious_patterns = [
r'<script[^>]*>', # Script injection
r'javascript:', # JavaScript URLs
r'\.\\./', # Directory traversal
r'[;\\|&`$]', # Shell metacharacters
]
for pattern in suspicious_patterns:
if re.search(pattern, sanitized, re.IGNORECASE):
logger.warning(f"Suspicious pattern in {field_name}: {pattern}")
return sanitized
# Handle lists and dictionaries recursively
elif isinstance(value, list):
return [self.sanitize_input(item, f"{field_name}[{i}]")
for i, item in enumerate(value)]
return value
Layer 3: Rate Limiting
def rate_limit(max_calls: int = 100, window_seconds: int = 60):
"""Sliding window rate limiter decorator."""
call_history: List[float] = []
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls outside window
cutoff = now - window_seconds
call_history[:] = [t for t in call_history if t > cutoff]
# Check rate limit
if len(call_history) >= max_calls:
raise PermissionError(
f"Rate limit exceeded: {max_calls} calls per {window_seconds}s"
)
call_history.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
Layer 4: Operation Security Decorator
def secure_operation(operation_name: str, require_project: bool = True):
"""Comprehensive security decorator for MCP operations."""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
security = get_security_manager()
# 1. Check operation permissions
security.check_operation_allowed(operation_name)
# 2. Sanitize all inputs
sanitized_kwargs = {}
for key, value in kwargs.items():
sanitized_kwargs[key] = security.sanitize_input(value, key)
# 3. Validate and normalize IDs
if 'ticket_id' in sanitized_kwargs:
sanitized_kwargs['ticket_id'] = security.validate_ticket_id(
sanitized_kwargs['ticket_id']
)
if 'epic_id' in sanitized_kwargs:
sanitized_kwargs['epic_id'] = security.validate_epic_id(
sanitized_kwargs['epic_id']
)
# 4. Validate file paths
for path_field in ['path', 'file_path', 'directory']:
if path_field in sanitized_kwargs:
sanitized_kwargs[path_field] = security.validate_path(
sanitized_kwargs[path_field]
)
# 5. Execute with audit logging
try:
security.log_operation(f"{operation_name}.start", {
'function': func.__name__,
'args_count': len(args),
'kwargs_keys': list(sanitized_kwargs.keys())
})
result = func(*args, **sanitized_kwargs)
security.log_operation(f"{operation_name}.success", {
'function': func.__name__,
'success': True
})
return result
except Exception as e:
security.log_operation(f"{operation_name}.error", {
'function': func.__name__,
'success': False,
'error_type': type(e).__name__,
'error_message': str(e)
}, success=False)
raise
return wrapper
return decorator
๐ Tool Architecture¶
Tool Registration System¶
Base Tool Pattern:
# Standard tool registration pattern
@mcp.tool(name="tool_name")
@secure_operation("tool_name", require_project=True)
@rate_limit(max_calls=30, window_seconds=60)
def tool_function(
required_param: str,
optional_param: Optional[str] = None
) -> OperationResult:
"""Tool description for AI understanding."""
# 1. Parameter validation
if not required_param:
raise ValidationError("Required parameter missing")
# 2. Business logic implementation
try:
# Core tool functionality
result_data = perform_operation(required_param, optional_param)
# 3. Return standardized result
return OperationResult(
success=True,
data=result_data,
message=f"Operation completed successfully"
)
except Exception as e:
logger.error(f"Tool operation failed: {e}")
return OperationResult(
success=False,
error=str(e),
message=f"Operation failed: {str(e)}"
)
Tool Categories & Design Patterns¶
Query Tools (Read-only operations):
# Pattern: High rate limits, no confirmation needed
@rate_limit(max_calls=100, window_seconds=60) # High limit
def list_tickets(...) -> OperationResult:
"""List/query operations - high frequency, read-only."""
pass
@rate_limit(max_calls=50, window_seconds=60) # Medium limit
def search_tickets(...) -> OperationResult:
"""Search operations - moderate frequency, more expensive."""
pass
Creation Tools (Create new resources):
# Pattern: Medium rate limits, optional confirmation
@rate_limit(max_calls=30, window_seconds=60) # Medium limit
def create_ticket(...) -> OperationResult:
"""Creation operations - moderate frequency, state changing."""
# Confirmation logic if configured
if security.require_confirmation and not dry_run:
# In real implementation, this would prompt user
pass
# Creation logic
pass
Update Tools (Modify existing resources):
# Pattern: Medium rate limits, audit logging
@rate_limit(max_calls=30, window_seconds=60)
def update_ticket(...) -> OperationResult:
"""Update operations - moderate frequency, important changes."""
# Extra validation for updates
if not ticket_exists(ticket_id):
raise NotFoundError(f"Ticket {ticket_id} not found")
# Update logic with change tracking
pass
Bulk Tools (Mass operations):
# Pattern: Low rate limits, always require confirmation
@rate_limit(max_calls=5, window_seconds=60) # Very low limit
def bulk_update_tickets(...) -> OperationResult:
"""Bulk operations - low frequency, high impact."""
# Always require confirmation for bulk operations
if len(ticket_ids) > 50:
raise ValidationError("Bulk operation too large (max 50 tickets)")
# Bulk processing with detailed results
pass
๐๏ธ Data Layer Architecture¶
File System Organization¶
.gira/
โโโ config.json # Project configuration
โโโ board/ # Active board tickets
โ โโโ todo/ # Todo status tickets
โ โโโ in_progress/ # In progress tickets
โ โโโ review/ # Review status tickets
โ โโโ done/ # Completed tickets
โโโ backlog/ # Backlog tickets
โ โโโ [ticket-files].json
โโโ epics/ # Epic definitions
โ โโโ [epic-files].json
โโโ sprints/ # Sprint data
โ โโโ active/ # Active sprints
โ โโโ completed/ # Completed sprints
โ โโโ planned/ # Planned sprints
โโโ audit/ # Audit logs (if configured)
โ โโโ mcp-operations.log
โโโ cache/ # Temporary cache files
โโโ board-state.json
Data Models¶
Ticket Model:
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
class Ticket(BaseModel):
"""Complete ticket data model."""
# Core identification
id: str # e.g., "GCM-123"
title: str
description: Optional[str] = None
# Classification
type: str = "task" # task, bug, feature, epic
priority: str = "medium" # low, medium, high, critical
status: str = "todo" # todo, in_progress, review, done
# Assignment and ownership
assignee: Optional[str] = None
reporter: Optional[str] = None
epic: Optional[str] = None
sprint: Optional[str] = None
# Estimation and tracking
story_points: Optional[int] = None
time_estimate: Optional[int] = None # hours
time_spent: Optional[int] = None # hours
# Relationships
parent: Optional[str] = None # Parent ticket
children: List[str] = [] # Child tickets
dependencies: List[str] = [] # Blocking dependencies
blocks: List[str] = [] # Tickets this blocks
# Metadata
labels: List[str] = []
custom_fields: dict = {}
# Timestamps
created_at: datetime
updated_at: datetime
resolved_at: Optional[datetime] = None
# Comments and history
comments: List[dict] = []
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
Epic Model:
class Epic(BaseModel):
"""Epic data model for large initiatives."""
# Core identification
id: str # e.g., "EPIC-001"
title: str
description: Optional[str] = None
# Status and ownership
status: str = "draft" # draft, active, completed, archived
owner: Optional[str] = None
# Timeline
start_date: Optional[datetime] = None
target_date: Optional[datetime] = None
actual_completion_date: Optional[datetime] = None
# Organization
labels: List[str] = []
custom_fields: dict = {}
# Progress tracking (calculated)
total_tickets: int = 0
completed_tickets: int = 0
progress_percentage: float = 0.0
# Metadata
created_at: datetime
updated_at: datetime
Sprint Model:
class Sprint(BaseModel):
"""Sprint data model for agile planning."""
# Core identification
id: str # e.g., "SPRINT-2024-01-15"
name: str
goal: Optional[str] = None
# Status and timeline
status: str = "planned" # planned, active, completed
start_date: datetime
end_date: datetime
actual_start_date: Optional[datetime] = None
actual_end_date: Optional[datetime] = None
# Team and capacity
team: Optional[str] = None
capacity: Optional[int] = None # story points
# Progress tracking (calculated)
total_story_points: int = 0
completed_story_points: int = 0
velocity: float = 0.0 # points per day
# Metadata
created_at: datetime
updated_at: datetime
# Burndown data
burndown_data: List[dict] = [] # Daily progress snapshots
Data Access Patterns¶
Repository Pattern Implementation:
class TicketRepository:
"""Repository for ticket data operations."""
def __init__(self, working_directory: Path):
self.working_directory = working_directory
self.gira_dir = working_directory / '.gira'
def find_by_id(self, ticket_id: str) -> Optional[Ticket]:
"""Find ticket by ID across all locations."""
# Search in board directories
for status_dir in ['todo', 'in_progress', 'review', 'done']:
ticket_path = self.gira_dir / 'board' / status_dir / f"{ticket_id}.json"
if ticket_path.exists():
return self._load_ticket(ticket_path)
# Search in backlog
ticket_path = self.gira_dir / 'backlog' / f"{ticket_id}.json"
if ticket_path.exists():
return self._load_ticket(ticket_path)
return None
def find_by_status(self, status: str) -> List[Ticket]:
"""Find all tickets with given status."""
tickets = []
status_dir = self.gira_dir / 'board' / status
if status_dir.exists():
for ticket_file in status_dir.glob("*.json"):
ticket = self._load_ticket(ticket_file)
if ticket:
tickets.append(ticket)
return tickets
def save(self, ticket: Ticket) -> None:
"""Save ticket to appropriate location based on status."""
if ticket.status == 'backlog':
ticket_dir = self.gira_dir / 'backlog'
else:
ticket_dir = self.gira_dir / 'board' / ticket.status
ticket_dir.mkdir(parents=True, exist_ok=True)
ticket_path = ticket_dir / f"{ticket.id}.json"
with open(ticket_path, 'w') as f:
json.dump(ticket.dict(), f, indent=2, default=str)
def move_ticket(self, ticket_id: str, new_status: str) -> bool:
"""Move ticket between status directories."""
ticket = self.find_by_id(ticket_id)
if not ticket:
return False
# Remove from old location
old_path = self._get_ticket_path(ticket_id, ticket.status)
if old_path and old_path.exists():
old_path.unlink()
# Update status and save to new location
ticket.status = new_status
ticket.updated_at = datetime.now()
self.save(ticket)
return True
def _load_ticket(self, ticket_path: Path) -> Optional[Ticket]:
"""Load ticket from JSON file."""
try:
with open(ticket_path, 'r') as f:
data = json.load(f)
return Ticket(**data)
except (json.JSONDecodeError, ValidationError) as e:
logger.error(f"Failed to load ticket {ticket_path}: {e}")
return None
๐ Event System & Hooks¶
Event-Driven Architecture¶
from typing import Callable, Dict, List
from enum import Enum
class EventType(Enum):
"""MCP operation event types."""
TICKET_CREATED = "ticket.created"
TICKET_UPDATED = "ticket.updated"
TICKET_MOVED = "ticket.moved"
EPIC_CREATED = "epic.created"
SPRINT_STARTED = "sprint.started"
SPRINT_COMPLETED = "sprint.completed"
class EventManager:
"""Event system for MCP operations."""
def __init__(self):
self.handlers: Dict[EventType, List[Callable]] = {}
def register_handler(self, event_type: EventType, handler: Callable):
"""Register event handler."""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def emit(self, event_type: EventType, data: dict):
"""Emit event to all registered handlers."""
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler(data)
except Exception as e:
logger.error(f"Event handler failed: {e}")
# Example event handlers
def update_epic_progress(data: dict):
"""Update epic progress when tickets change."""
if 'epic_id' in data:
# Recalculate epic progress
pass
def update_sprint_burndown(data: dict):
"""Update sprint burndown when tickets move."""
if 'sprint_id' in data:
# Update burndown data
pass
# Register handlers
event_manager = EventManager()
event_manager.register_handler(EventType.TICKET_UPDATED, update_epic_progress)
event_manager.register_handler(EventType.TICKET_MOVED, update_sprint_burndown)
๐งช Testing Architecture¶
Test Organization¶
tests/
โโโ unit/ # Unit tests
โ โโโ test_security.py # Security manager tests
โ โโโ test_tools.py # Individual tool tests
โ โโโ test_models.py # Data model tests
โโโ integration/ # Integration tests
โ โโโ test_mcp_server.py # Full MCP protocol tests
โ โโโ test_workflows.py # End-to-end workflows
โ โโโ test_security_integration.py
โโโ fixtures/ # Test data
โ โโโ sample_project/ # Complete .gira project
โ โโโ test_configs/ # Various configurations
โโโ helpers/ # Test utilities
โโโ mcp_client.py # Mock MCP client
โโโ test_helpers.py # Common test functions
Test Patterns¶
Security Testing:
import pytest
from gira.mcp.security import MCPSecurityManager, ValidationError, PermissionError
class TestMCPSecurity:
"""Comprehensive security testing."""
def test_path_traversal_prevention(self):
"""Test path traversal attack prevention."""
security = MCPSecurityManager()
# These should be blocked
with pytest.raises(PermissionError):
security.validate_path("../../../etc/passwd")
with pytest.raises(PermissionError):
security.validate_path("/etc/passwd")
# These should be allowed
valid_path = security.validate_path("tickets/GCM-123.json")
assert valid_path.is_relative_to(security.working_directory)
def test_input_sanitization(self):
"""Test input sanitization patterns."""
security = MCPSecurityManager()
# Malicious script injection
malicious_input = "<script>alert('xss')</script>"
sanitized = security.sanitize_input(malicious_input, "title")
assert "<script>" not in sanitized
# SQL injection patterns
sql_injection = "'; DROP TABLE tickets; --"
sanitized = security.sanitize_input(sql_injection, "query")
# Should be sanitized but logged
def test_rate_limiting(self):
"""Test rate limiting functionality."""
from gira.mcp.security import rate_limit
@rate_limit(max_calls=3, window_seconds=1)
def test_function():
return "success"
# Should work for first 3 calls
for _ in range(3):
assert test_function() == "success"
# Should fail on 4th call
with pytest.raises(PermissionError):
test_function()
Tool Testing:
import pytest
from unittest.mock import Mock, patch
from gira.mcp.tools.tickets import list_tickets
class TestTicketTools:
"""Test ticket management tools."""
@pytest.fixture
def mock_security_manager(self):
"""Mock security manager for testing."""
with patch('gira.mcp.tools.tickets.get_security_manager') as mock:
security = Mock()
security.sanitize_input.side_effect = lambda x, _: x
security.validate_ticket_id.side_effect = lambda x: f"GCM-{x}" if x.isdigit() else x
mock.return_value = security
yield security
def test_list_tickets_basic(self, mock_security_manager):
"""Test basic ticket listing."""
result = list_tickets()
assert result.success
assert isinstance(result.data, list)
def test_list_tickets_with_filters(self, mock_security_manager):
"""Test ticket listing with filters."""
result = list_tickets(
status="todo",
assignee="test@example.com",
limit=10
)
assert result.success
# Verify filtering logic
def test_list_tickets_security_validation(self, mock_security_manager):
"""Test security validation in tool calls."""
# Should call security sanitization
list_tickets(status="todo")
mock_security_manager.sanitize_input.assert_called()
Integration Testing:
import pytest
import json
from pathlib import Path
from gira.mcp.server import create_mcp_server
class TestMCPIntegration:
"""Integration tests for full MCP protocol."""
@pytest.fixture
def test_project(self, tmp_path):
"""Create test Gira project."""
gira_dir = tmp_path / '.gira'
gira_dir.mkdir()
# Create sample config
config = {
"project_name": "Test Project",
"statuses": ["todo", "in_progress", "review", "done"]
}
(gira_dir / 'config.json').write_text(json.dumps(config))
# Create sample tickets
board_dir = gira_dir / 'board' / 'todo'
board_dir.mkdir(parents=True)
ticket = {
"id": "GCM-123",
"title": "Test ticket",
"status": "todo",
"created_at": "2024-01-01T00:00:00",
"updated_at": "2024-01-01T00:00:00"
}
(board_dir / 'GCM-123.json').write_text(json.dumps(ticket))
return tmp_path
def test_full_workflow(self, test_project):
"""Test complete ticket workflow."""
# Initialize MCP server
with patch.dict('os.environ', {'GIRA_MCP_WORKING_DIRECTORY': str(test_project)}):
server = create_mcp_server()
# Test list tickets
result = server.call_tool("list_tickets", {})
assert result["success"]
assert len(result["data"]) == 1
# Test create ticket
result = server.call_tool("create_ticket", {
"title": "New test ticket",
"description": "Created via integration test"
})
assert result["success"]
new_ticket_id = result["data"]["id"]
# Test update ticket
result = server.call_tool("update_ticket", {
"ticket_id": new_ticket_id,
"status": "in_progress"
})
assert result["success"]
# Verify ticket was moved
result = server.call_tool("list_tickets", {"status": "in_progress"})
assert len(result["data"]) == 1
assert result["data"][0]["id"] == new_ticket_id
๐ Performance Optimization¶
Caching Strategy¶
from functools import lru_cache
from typing import Optional
import time
class BoardStateCache:
"""Intelligent caching for expensive board operations."""
def __init__(self, ttl_seconds: int = 60):
self.ttl_seconds = ttl_seconds
self._cache = {}
self._timestamps = {}
def get_board_state(self, cache_key: str) -> Optional[dict]:
"""Get cached board state if valid."""
if cache_key in self._cache:
if time.time() - self._timestamps[cache_key] < self.ttl_seconds:
return self._cache[cache_key]
else:
# Expired cache
del self._cache[cache_key]
del self._timestamps[cache_key]
return None
def set_board_state(self, cache_key: str, state: dict):
"""Cache board state."""
self._cache[cache_key] = state
self._timestamps[cache_key] = time.time()
def invalidate(self, pattern: str = None):
"""Invalidate cache entries."""
if pattern:
keys_to_remove = [k for k in self._cache.keys() if pattern in k]
for key in keys_to_remove:
del self._cache[key]
del self._timestamps[key]
else:
self._cache.clear()
self._timestamps.clear()
# Global cache instance
board_cache = BoardStateCache()
@lru_cache(maxsize=100)
def get_ticket_by_id(ticket_id: str) -> Optional[dict]:
"""Cached ticket lookup."""
# Implementation with caching
pass
Lazy Loading Patterns¶
class LazyTicketLoader:
"""Lazy loading for ticket collections."""
def __init__(self, ticket_ids: List[str]):
self.ticket_ids = ticket_ids
self._loaded_tickets = {}
def load_ticket(self, ticket_id: str) -> Optional[dict]:
"""Load individual ticket on demand."""
if ticket_id not in self._loaded_tickets:
self._loaded_tickets[ticket_id] = self._fetch_ticket(ticket_id)
return self._loaded_tickets[ticket_id]
def load_batch(self, count: int = 10) -> List[dict]:
"""Load tickets in batches."""
unloaded = [tid for tid in self.ticket_ids[:count]
if tid not in self._loaded_tickets]
for ticket_id in unloaded:
self.load_ticket(ticket_id)
return [self._loaded_tickets[tid] for tid in self.ticket_ids[:count]]
def _fetch_ticket(self, ticket_id: str) -> Optional[dict]:
"""Actual ticket loading logic."""
# Implementation
pass
๐ Monitoring & Observability¶
Metrics Collection¶
import time
from collections import defaultdict
from typing import Dict, Any
class MCPMetrics:
"""Metrics collection for MCP operations."""
def __init__(self):
self.operation_counts = defaultdict(int)
self.operation_times = defaultdict(list)
self.error_counts = defaultdict(int)
self.rate_limit_hits = defaultdict(int)
def record_operation(self, operation: str, duration: float, success: bool):
"""Record operation metrics."""
self.operation_counts[operation] += 1
self.operation_times[operation].append(duration)
if not success:
self.error_counts[operation] += 1
def record_rate_limit(self, operation: str):
"""Record rate limit violation."""
self.rate_limit_hits[operation] += 1
def get_summary(self) -> Dict[str, Any]:
"""Get metrics summary."""
return {
'total_operations': sum(self.operation_counts.values()),
'average_response_time': self._calculate_avg_response_time(),
'error_rate': self._calculate_error_rate(),
'top_operations': self._get_top_operations(),
'rate_limit_violations': sum(self.rate_limit_hits.values())
}
def _calculate_avg_response_time(self) -> float:
"""Calculate overall average response time."""
all_times = []
for times in self.operation_times.values():
all_times.extend(times)
return sum(all_times) / len(all_times) if all_times else 0.0
def _calculate_error_rate(self) -> float:
"""Calculate overall error rate."""
total_ops = sum(self.operation_counts.values())
total_errors = sum(self.error_counts.values())
return (total_errors / total_ops) * 100 if total_ops > 0 else 0.0
# Global metrics instance
mcp_metrics = MCPMetrics()
Health Checks¶
class HealthChecker:
"""System health monitoring."""
def __init__(self, working_directory: Path):
self.working_directory = working_directory
self.gira_dir = working_directory / '.gira'
def check_health(self) -> Dict[str, Any]:
"""Comprehensive health check."""
health = {
'status': 'healthy',
'checks': {}
}
# Check project structure
health['checks']['project_structure'] = self._check_project_structure()
# Check file system permissions
health['checks']['permissions'] = self._check_permissions()
# Check data integrity
health['checks']['data_integrity'] = self._check_data_integrity()
# Check system resources
health['checks']['resources'] = self._check_resources()
# Determine overall status
if any(check['status'] == 'error' for check in health['checks'].values()):
health['status'] = 'error'
elif any(check['status'] == 'warning' for check in health['checks'].values()):
health['status'] = 'warning'
return health
def _check_project_structure(self) -> Dict[str, Any]:
"""Check Gira project structure integrity."""
required_dirs = ['board', 'backlog', 'epics', 'sprints']
missing_dirs = []
for dir_name in required_dirs:
if not (self.gira_dir / dir_name).exists():
missing_dirs.append(dir_name)
if missing_dirs:
return {
'status': 'error',
'message': f'Missing directories: {missing_dirs}'
}
return {'status': 'ok', 'message': 'Project structure valid'}
This comprehensive architecture documentation provides developers with the technical foundation needed to understand, extend, and maintain the Gira MCP server implementation.