Skip to content

Gira MCP Server Architecture

This document provides comprehensive technical details about the Gira MCP server architecture, implementation patterns, and development guidelines for contributors and integrators.

๐Ÿ—๏ธ High-Level Architecture

System Components

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Claude Desktop / MCP Client              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                      โ”‚ JSON-RPC over stdio
                      โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Gira MCP Server                           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚  โ”‚   FastMCP   โ”‚  โ”‚  Security   โ”‚  โ”‚    Config   โ”‚          โ”‚
โ”‚  โ”‚ Framework   โ”‚  โ”‚  Manager    โ”‚  โ”‚  Manager    โ”‚          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                   Tool Registry                             โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚  โ”‚   Ticket    โ”‚  โ”‚    Epic     โ”‚  โ”‚   Sprint    โ”‚          โ”‚
โ”‚  โ”‚   Tools     โ”‚  โ”‚    Tools    โ”‚  โ”‚    Tools    โ”‚          โ”‚
โ”‚  โ”‚  (12 tools) โ”‚  โ”‚  (7 tools)  โ”‚  โ”‚  (8 tools)  โ”‚          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚  โ”‚    Board    โ”‚                                            โ”‚
โ”‚  โ”‚    Tools    โ”‚                                            โ”‚
โ”‚  โ”‚  (3 tools)  โ”‚                                            โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                   Core Services                             โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚  โ”‚  Validation โ”‚  โ”‚   Audit     โ”‚  โ”‚    Rate     โ”‚          โ”‚
โ”‚  โ”‚   Service   โ”‚  โ”‚  Logger     โ”‚  โ”‚   Limiter   โ”‚          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                      โ”‚ File System Operations
                      โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Gira Data Layer                           โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚  โ”‚   .gira/    โ”‚  โ”‚  Tickets    โ”‚  โ”‚   Epics     โ”‚          โ”‚
โ”‚  โ”‚ directory   โ”‚  โ”‚  (JSON)     โ”‚  โ”‚   (JSON)    โ”‚          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                          โ”‚
โ”‚  โ”‚   Sprints   โ”‚  โ”‚    Board    โ”‚                          โ”‚
โ”‚  โ”‚   (JSON)    โ”‚  โ”‚   State     โ”‚                          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ”ง Core Framework

FastMCP Integration

The Gira MCP server is built on the FastMCP framework, providing:

# Core MCP server setup
from fastmcp import FastMCP

# Initialize MCP server with configuration
mcp = FastMCP("gira")

# Tool registration pattern
@mcp.tool(name="list_tickets")
@secure_operation("list_tickets", require_project=True)
@rate_limit(max_calls=50, window_seconds=60)
def list_tickets(
    status: Optional[str] = None,
    assignee: Optional[str] = None,
    epic: Optional[str] = None,
    limit: Optional[int] = None
) -> OperationResult:
    """List tickets with optional filtering."""
    # Implementation details...

Key FastMCP Features Used: - Automatic JSON-RPC handling for MCP protocol compliance - Type-safe tool definitions with Pydantic models - Built-in error handling with proper MCP error responses - Streaming support for large result sets - Resource management for file and directory operations

Configuration System

Configuration Hierarchy: 1. Environment Variables (highest priority) 2. Configuration Files (.gira/mcp-config.json) 3. Default Values (lowest priority)

from pydantic import BaseModel, Field
from typing import Optional, List

class MCPConfig(BaseModel):
    """Comprehensive MCP server configuration."""

    # Core settings
    working_directory: str = Field(default=".", description="Gira project directory")
    dry_run: bool = Field(default=True, description="Enable dry run mode")

    # Security settings
    require_confirmation: bool = Field(default=False, description="Require confirmation for destructive operations")
    blocked_operations: List[str] = Field(default=[], description="List of blocked operation names")
    max_input_length: int = Field(default=10000, description="Maximum input string length")
    max_list_length: int = Field(default=1000, description="Maximum list input length")

    # Audit settings
    audit_enabled: bool = Field(default=True, description="Enable audit logging")
    verbose_logging: bool = Field(default=False, description="Include parameters in audit logs")
    audit_file: str = Field(default="gira-mcp-audit.log", description="Audit log file name")

    # Rate limiting (built into tools, not configurable)
    # Tool-specific limits defined in security decorators

    class Config:
        env_prefix = "GIRA_MCP_"
        case_sensitive = False

Environment Variable Examples:

# Core configuration
export GIRA_MCP_WORKING_DIRECTORY="/path/to/gira/project"
export GIRA_MCP_DRY_RUN=false

# Security configuration  
export GIRA_MCP_REQUIRE_CONFIRMATION=true
export GIRA_MCP_BLOCKED_OPERATIONS="delete_ticket,archive_tickets"

# Audit configuration
export GIRA_MCP_AUDIT_ENABLED=true
export GIRA_MCP_VERBOSE_LOGGING=false

๐Ÿ›ก๏ธ Security Architecture

Multi-Layer Security Model

Layer 1: Working Directory Validation

class MCPSecurityManager:
    def validate_path(self, path: Union[str, Path]) -> Path:
        """Ensure path is within project boundaries."""
        try:
            path_obj = Path(path)

            # Convert relative to absolute within working directory
            if not path_obj.is_absolute():
                path_obj = self.working_directory / path_obj

            resolved_path = path_obj.resolve()

            # Prevent directory traversal
            try:
                resolved_path.relative_to(self.working_directory)
                return resolved_path
            except ValueError:
                raise PermissionError(f"Path '{path}' outside project boundaries")

        except (OSError, ValueError) as e:
            raise PermissionError(f"Invalid path '{path}': {str(e)}")

Layer 2: Input Validation & Sanitization

def sanitize_input(self, value: Any, field_name: str = "input") -> Any:
    """Comprehensive input sanitization."""
    if value is None:
        return None

    if isinstance(value, str):
        # Length validation
        if len(value) > self.max_input_length:
            raise ValidationError(f"{field_name} exceeds maximum length")

        # Remove dangerous characters
        sanitized = value.replace('\x00', '').replace('\r\n', '\n')

        # Pattern detection (log suspicious patterns)
        suspicious_patterns = [
            r'<script[^>]*>',  # Script injection
            r'javascript:',     # JavaScript URLs
            r'\.\\./',          # Directory traversal
            r'[;\\|&`$]',       # Shell metacharacters
        ]

        for pattern in suspicious_patterns:
            if re.search(pattern, sanitized, re.IGNORECASE):
                logger.warning(f"Suspicious pattern in {field_name}: {pattern}")

        return sanitized

    # Handle lists and dictionaries recursively
    elif isinstance(value, list):
        return [self.sanitize_input(item, f"{field_name}[{i}]") 
                for i, item in enumerate(value)]

    return value

Layer 3: Rate Limiting

def rate_limit(max_calls: int = 100, window_seconds: int = 60):
    """Sliding window rate limiter decorator."""
    call_history: List[float] = []

    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()

            # Remove old calls outside window
            cutoff = now - window_seconds
            call_history[:] = [t for t in call_history if t > cutoff]

            # Check rate limit
            if len(call_history) >= max_calls:
                raise PermissionError(
                    f"Rate limit exceeded: {max_calls} calls per {window_seconds}s"
                )

            call_history.append(now)
            return func(*args, **kwargs)

        return wrapper
    return decorator

Layer 4: Operation Security Decorator

def secure_operation(operation_name: str, require_project: bool = True):
    """Comprehensive security decorator for MCP operations."""
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args, **kwargs):
            security = get_security_manager()

            # 1. Check operation permissions
            security.check_operation_allowed(operation_name)

            # 2. Sanitize all inputs
            sanitized_kwargs = {}
            for key, value in kwargs.items():
                sanitized_kwargs[key] = security.sanitize_input(value, key)

            # 3. Validate and normalize IDs
            if 'ticket_id' in sanitized_kwargs:
                sanitized_kwargs['ticket_id'] = security.validate_ticket_id(
                    sanitized_kwargs['ticket_id']
                )

            if 'epic_id' in sanitized_kwargs:
                sanitized_kwargs['epic_id'] = security.validate_epic_id(
                    sanitized_kwargs['epic_id']
                )

            # 4. Validate file paths
            for path_field in ['path', 'file_path', 'directory']:
                if path_field in sanitized_kwargs:
                    sanitized_kwargs[path_field] = security.validate_path(
                        sanitized_kwargs[path_field]
                    )

            # 5. Execute with audit logging
            try:
                security.log_operation(f"{operation_name}.start", {
                    'function': func.__name__,
                    'args_count': len(args),
                    'kwargs_keys': list(sanitized_kwargs.keys())
                })

                result = func(*args, **sanitized_kwargs)

                security.log_operation(f"{operation_name}.success", {
                    'function': func.__name__,
                    'success': True
                })

                return result

            except Exception as e:
                security.log_operation(f"{operation_name}.error", {
                    'function': func.__name__,
                    'success': False,
                    'error_type': type(e).__name__,
                    'error_message': str(e)
                }, success=False)
                raise

        return wrapper
    return decorator

๐Ÿ“Š Tool Architecture

Tool Registration System

Base Tool Pattern:

# Standard tool registration pattern
@mcp.tool(name="tool_name")
@secure_operation("tool_name", require_project=True)
@rate_limit(max_calls=30, window_seconds=60)
def tool_function(
    required_param: str,
    optional_param: Optional[str] = None
) -> OperationResult:
    """Tool description for AI understanding."""

    # 1. Parameter validation
    if not required_param:
        raise ValidationError("Required parameter missing")

    # 2. Business logic implementation
    try:
        # Core tool functionality
        result_data = perform_operation(required_param, optional_param)

        # 3. Return standardized result
        return OperationResult(
            success=True,
            data=result_data,
            message=f"Operation completed successfully"
        )

    except Exception as e:
        logger.error(f"Tool operation failed: {e}")
        return OperationResult(
            success=False,
            error=str(e),
            message=f"Operation failed: {str(e)}"
        )

Tool Categories & Design Patterns

Query Tools (Read-only operations):

# Pattern: High rate limits, no confirmation needed
@rate_limit(max_calls=100, window_seconds=60)  # High limit
def list_tickets(...) -> OperationResult:
    """List/query operations - high frequency, read-only."""
    pass

@rate_limit(max_calls=50, window_seconds=60)   # Medium limit  
def search_tickets(...) -> OperationResult:
    """Search operations - moderate frequency, more expensive."""
    pass

Creation Tools (Create new resources):

# Pattern: Medium rate limits, optional confirmation
@rate_limit(max_calls=30, window_seconds=60)   # Medium limit
def create_ticket(...) -> OperationResult:
    """Creation operations - moderate frequency, state changing."""
    # Confirmation logic if configured
    if security.require_confirmation and not dry_run:
        # In real implementation, this would prompt user
        pass

    # Creation logic
    pass

Update Tools (Modify existing resources):

# Pattern: Medium rate limits, audit logging
@rate_limit(max_calls=30, window_seconds=60)
def update_ticket(...) -> OperationResult:
    """Update operations - moderate frequency, important changes."""
    # Extra validation for updates
    if not ticket_exists(ticket_id):
        raise NotFoundError(f"Ticket {ticket_id} not found")

    # Update logic with change tracking
    pass

Bulk Tools (Mass operations):

# Pattern: Low rate limits, always require confirmation
@rate_limit(max_calls=5, window_seconds=60)    # Very low limit
def bulk_update_tickets(...) -> OperationResult:
    """Bulk operations - low frequency, high impact."""
    # Always require confirmation for bulk operations
    if len(ticket_ids) > 50:
        raise ValidationError("Bulk operation too large (max 50 tickets)")

    # Bulk processing with detailed results
    pass

๐Ÿ—„๏ธ Data Layer Architecture

File System Organization

.gira/
โ”œโ”€โ”€ config.json              # Project configuration
โ”œโ”€โ”€ board/                   # Active board tickets
โ”‚   โ”œโ”€โ”€ todo/               # Todo status tickets
โ”‚   โ”œโ”€โ”€ in_progress/        # In progress tickets  
โ”‚   โ”œโ”€โ”€ review/             # Review status tickets
โ”‚   โ””โ”€โ”€ done/               # Completed tickets
โ”œโ”€โ”€ backlog/                # Backlog tickets
โ”‚   โ””โ”€โ”€ [ticket-files].json
โ”œโ”€โ”€ epics/                  # Epic definitions
โ”‚   โ””โ”€โ”€ [epic-files].json
โ”œโ”€โ”€ sprints/                # Sprint data
โ”‚   โ”œโ”€โ”€ active/            # Active sprints
โ”‚   โ”œโ”€โ”€ completed/         # Completed sprints
โ”‚   โ””โ”€โ”€ planned/           # Planned sprints
โ”œโ”€โ”€ audit/                 # Audit logs (if configured)
โ”‚   โ””โ”€โ”€ mcp-operations.log
โ””โ”€โ”€ cache/                 # Temporary cache files
    โ””โ”€โ”€ board-state.json

Data Models

Ticket Model:

from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime

class Ticket(BaseModel):
    """Complete ticket data model."""

    # Core identification
    id: str                    # e.g., "GCM-123"
    title: str
    description: Optional[str] = None

    # Classification
    type: str = "task"         # task, bug, feature, epic
    priority: str = "medium"   # low, medium, high, critical
    status: str = "todo"       # todo, in_progress, review, done

    # Assignment and ownership
    assignee: Optional[str] = None
    reporter: Optional[str] = None
    epic: Optional[str] = None
    sprint: Optional[str] = None

    # Estimation and tracking
    story_points: Optional[int] = None
    time_estimate: Optional[int] = None  # hours
    time_spent: Optional[int] = None     # hours

    # Relationships
    parent: Optional[str] = None         # Parent ticket
    children: List[str] = []             # Child tickets
    dependencies: List[str] = []         # Blocking dependencies
    blocks: List[str] = []               # Tickets this blocks

    # Metadata
    labels: List[str] = []
    custom_fields: dict = {}

    # Timestamps
    created_at: datetime
    updated_at: datetime
    resolved_at: Optional[datetime] = None

    # Comments and history
    comments: List[dict] = []

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

Epic Model:

class Epic(BaseModel):
    """Epic data model for large initiatives."""

    # Core identification
    id: str                    # e.g., "EPIC-001"
    title: str
    description: Optional[str] = None

    # Status and ownership
    status: str = "draft"      # draft, active, completed, archived
    owner: Optional[str] = None

    # Timeline
    start_date: Optional[datetime] = None
    target_date: Optional[datetime] = None
    actual_completion_date: Optional[datetime] = None

    # Organization
    labels: List[str] = []
    custom_fields: dict = {}

    # Progress tracking (calculated)
    total_tickets: int = 0
    completed_tickets: int = 0
    progress_percentage: float = 0.0

    # Metadata
    created_at: datetime
    updated_at: datetime

Sprint Model:

class Sprint(BaseModel):
    """Sprint data model for agile planning."""

    # Core identification
    id: str                    # e.g., "SPRINT-2024-01-15"
    name: str
    goal: Optional[str] = None

    # Status and timeline
    status: str = "planned"    # planned, active, completed
    start_date: datetime
    end_date: datetime
    actual_start_date: Optional[datetime] = None
    actual_end_date: Optional[datetime] = None

    # Team and capacity
    team: Optional[str] = None
    capacity: Optional[int] = None  # story points

    # Progress tracking (calculated)
    total_story_points: int = 0
    completed_story_points: int = 0
    velocity: float = 0.0      # points per day

    # Metadata
    created_at: datetime
    updated_at: datetime

    # Burndown data
    burndown_data: List[dict] = []  # Daily progress snapshots

Data Access Patterns

Repository Pattern Implementation:

class TicketRepository:
    """Repository for ticket data operations."""

    def __init__(self, working_directory: Path):
        self.working_directory = working_directory
        self.gira_dir = working_directory / '.gira'

    def find_by_id(self, ticket_id: str) -> Optional[Ticket]:
        """Find ticket by ID across all locations."""
        # Search in board directories
        for status_dir in ['todo', 'in_progress', 'review', 'done']:
            ticket_path = self.gira_dir / 'board' / status_dir / f"{ticket_id}.json"
            if ticket_path.exists():
                return self._load_ticket(ticket_path)

        # Search in backlog
        ticket_path = self.gira_dir / 'backlog' / f"{ticket_id}.json"
        if ticket_path.exists():
            return self._load_ticket(ticket_path)

        return None

    def find_by_status(self, status: str) -> List[Ticket]:
        """Find all tickets with given status."""
        tickets = []
        status_dir = self.gira_dir / 'board' / status

        if status_dir.exists():
            for ticket_file in status_dir.glob("*.json"):
                ticket = self._load_ticket(ticket_file)
                if ticket:
                    tickets.append(ticket)

        return tickets

    def save(self, ticket: Ticket) -> None:
        """Save ticket to appropriate location based on status."""
        if ticket.status == 'backlog':
            ticket_dir = self.gira_dir / 'backlog'
        else:
            ticket_dir = self.gira_dir / 'board' / ticket.status

        ticket_dir.mkdir(parents=True, exist_ok=True)
        ticket_path = ticket_dir / f"{ticket.id}.json"

        with open(ticket_path, 'w') as f:
            json.dump(ticket.dict(), f, indent=2, default=str)

    def move_ticket(self, ticket_id: str, new_status: str) -> bool:
        """Move ticket between status directories."""
        ticket = self.find_by_id(ticket_id)
        if not ticket:
            return False

        # Remove from old location
        old_path = self._get_ticket_path(ticket_id, ticket.status)
        if old_path and old_path.exists():
            old_path.unlink()

        # Update status and save to new location
        ticket.status = new_status
        ticket.updated_at = datetime.now()
        self.save(ticket)

        return True

    def _load_ticket(self, ticket_path: Path) -> Optional[Ticket]:
        """Load ticket from JSON file."""
        try:
            with open(ticket_path, 'r') as f:
                data = json.load(f)
            return Ticket(**data)
        except (json.JSONDecodeError, ValidationError) as e:
            logger.error(f"Failed to load ticket {ticket_path}: {e}")
            return None

๐Ÿ”„ Event System & Hooks

Event-Driven Architecture

from typing import Callable, Dict, List
from enum import Enum

class EventType(Enum):
    """MCP operation event types."""
    TICKET_CREATED = "ticket.created"
    TICKET_UPDATED = "ticket.updated"
    TICKET_MOVED = "ticket.moved"
    EPIC_CREATED = "epic.created"
    SPRINT_STARTED = "sprint.started"
    SPRINT_COMPLETED = "sprint.completed"

class EventManager:
    """Event system for MCP operations."""

    def __init__(self):
        self.handlers: Dict[EventType, List[Callable]] = {}

    def register_handler(self, event_type: EventType, handler: Callable):
        """Register event handler."""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)

    def emit(self, event_type: EventType, data: dict):
        """Emit event to all registered handlers."""
        if event_type in self.handlers:
            for handler in self.handlers[event_type]:
                try:
                    handler(data)
                except Exception as e:
                    logger.error(f"Event handler failed: {e}")

# Example event handlers
def update_epic_progress(data: dict):
    """Update epic progress when tickets change."""
    if 'epic_id' in data:
        # Recalculate epic progress
        pass

def update_sprint_burndown(data: dict):
    """Update sprint burndown when tickets move."""
    if 'sprint_id' in data:
        # Update burndown data
        pass

# Register handlers
event_manager = EventManager()
event_manager.register_handler(EventType.TICKET_UPDATED, update_epic_progress)
event_manager.register_handler(EventType.TICKET_MOVED, update_sprint_burndown)

๐Ÿงช Testing Architecture

Test Organization

tests/
โ”œโ”€โ”€ unit/                   # Unit tests
โ”‚   โ”œโ”€โ”€ test_security.py   # Security manager tests
โ”‚   โ”œโ”€โ”€ test_tools.py      # Individual tool tests
โ”‚   โ””โ”€โ”€ test_models.py     # Data model tests
โ”œโ”€โ”€ integration/           # Integration tests
โ”‚   โ”œโ”€โ”€ test_mcp_server.py # Full MCP protocol tests
โ”‚   โ”œโ”€โ”€ test_workflows.py  # End-to-end workflows
โ”‚   โ””โ”€โ”€ test_security_integration.py
โ”œโ”€โ”€ fixtures/              # Test data
โ”‚   โ”œโ”€โ”€ sample_project/    # Complete .gira project
โ”‚   โ””โ”€โ”€ test_configs/      # Various configurations
โ””โ”€โ”€ helpers/               # Test utilities
    โ”œโ”€โ”€ mcp_client.py      # Mock MCP client
    โ””โ”€โ”€ test_helpers.py    # Common test functions

Test Patterns

Security Testing:

import pytest
from gira.mcp.security import MCPSecurityManager, ValidationError, PermissionError

class TestMCPSecurity:
    """Comprehensive security testing."""

    def test_path_traversal_prevention(self):
        """Test path traversal attack prevention."""
        security = MCPSecurityManager()

        # These should be blocked
        with pytest.raises(PermissionError):
            security.validate_path("../../../etc/passwd")

        with pytest.raises(PermissionError):
            security.validate_path("/etc/passwd")

        # These should be allowed
        valid_path = security.validate_path("tickets/GCM-123.json")
        assert valid_path.is_relative_to(security.working_directory)

    def test_input_sanitization(self):
        """Test input sanitization patterns."""
        security = MCPSecurityManager()

        # Malicious script injection
        malicious_input = "<script>alert('xss')</script>"
        sanitized = security.sanitize_input(malicious_input, "title")
        assert "<script>" not in sanitized

        # SQL injection patterns
        sql_injection = "'; DROP TABLE tickets; --"
        sanitized = security.sanitize_input(sql_injection, "query")
        # Should be sanitized but logged

    def test_rate_limiting(self):
        """Test rate limiting functionality."""
        from gira.mcp.security import rate_limit

        @rate_limit(max_calls=3, window_seconds=1)
        def test_function():
            return "success"

        # Should work for first 3 calls
        for _ in range(3):
            assert test_function() == "success"

        # Should fail on 4th call
        with pytest.raises(PermissionError):
            test_function()

Tool Testing:

import pytest
from unittest.mock import Mock, patch
from gira.mcp.tools.tickets import list_tickets

class TestTicketTools:
    """Test ticket management tools."""

    @pytest.fixture
    def mock_security_manager(self):
        """Mock security manager for testing."""
        with patch('gira.mcp.tools.tickets.get_security_manager') as mock:
            security = Mock()
            security.sanitize_input.side_effect = lambda x, _: x
            security.validate_ticket_id.side_effect = lambda x: f"GCM-{x}" if x.isdigit() else x
            mock.return_value = security
            yield security

    def test_list_tickets_basic(self, mock_security_manager):
        """Test basic ticket listing."""
        result = list_tickets()

        assert result.success
        assert isinstance(result.data, list)

    def test_list_tickets_with_filters(self, mock_security_manager):
        """Test ticket listing with filters."""
        result = list_tickets(
            status="todo",
            assignee="test@example.com",
            limit=10
        )

        assert result.success
        # Verify filtering logic

    def test_list_tickets_security_validation(self, mock_security_manager):
        """Test security validation in tool calls."""
        # Should call security sanitization
        list_tickets(status="todo")

        mock_security_manager.sanitize_input.assert_called()

Integration Testing:

import pytest
import json
from pathlib import Path
from gira.mcp.server import create_mcp_server

class TestMCPIntegration:
    """Integration tests for full MCP protocol."""

    @pytest.fixture
    def test_project(self, tmp_path):
        """Create test Gira project."""
        gira_dir = tmp_path / '.gira'
        gira_dir.mkdir()

        # Create sample config
        config = {
            "project_name": "Test Project",
            "statuses": ["todo", "in_progress", "review", "done"]
        }
        (gira_dir / 'config.json').write_text(json.dumps(config))

        # Create sample tickets
        board_dir = gira_dir / 'board' / 'todo'
        board_dir.mkdir(parents=True)

        ticket = {
            "id": "GCM-123",
            "title": "Test ticket",
            "status": "todo",
            "created_at": "2024-01-01T00:00:00",
            "updated_at": "2024-01-01T00:00:00"
        }
        (board_dir / 'GCM-123.json').write_text(json.dumps(ticket))

        return tmp_path

    def test_full_workflow(self, test_project):
        """Test complete ticket workflow."""
        # Initialize MCP server
        with patch.dict('os.environ', {'GIRA_MCP_WORKING_DIRECTORY': str(test_project)}):
            server = create_mcp_server()

            # Test list tickets
            result = server.call_tool("list_tickets", {})
            assert result["success"]
            assert len(result["data"]) == 1

            # Test create ticket
            result = server.call_tool("create_ticket", {
                "title": "New test ticket",
                "description": "Created via integration test"
            })
            assert result["success"]
            new_ticket_id = result["data"]["id"]

            # Test update ticket
            result = server.call_tool("update_ticket", {
                "ticket_id": new_ticket_id,
                "status": "in_progress"
            })
            assert result["success"]

            # Verify ticket was moved
            result = server.call_tool("list_tickets", {"status": "in_progress"})
            assert len(result["data"]) == 1
            assert result["data"][0]["id"] == new_ticket_id

๐Ÿš€ Performance Optimization

Caching Strategy

from functools import lru_cache
from typing import Optional
import time

class BoardStateCache:
    """Intelligent caching for expensive board operations."""

    def __init__(self, ttl_seconds: int = 60):
        self.ttl_seconds = ttl_seconds
        self._cache = {}
        self._timestamps = {}

    def get_board_state(self, cache_key: str) -> Optional[dict]:
        """Get cached board state if valid."""
        if cache_key in self._cache:
            if time.time() - self._timestamps[cache_key] < self.ttl_seconds:
                return self._cache[cache_key]
            else:
                # Expired cache
                del self._cache[cache_key]
                del self._timestamps[cache_key]

        return None

    def set_board_state(self, cache_key: str, state: dict):
        """Cache board state."""
        self._cache[cache_key] = state
        self._timestamps[cache_key] = time.time()

    def invalidate(self, pattern: str = None):
        """Invalidate cache entries."""
        if pattern:
            keys_to_remove = [k for k in self._cache.keys() if pattern in k]
            for key in keys_to_remove:
                del self._cache[key]
                del self._timestamps[key]
        else:
            self._cache.clear()
            self._timestamps.clear()

# Global cache instance
board_cache = BoardStateCache()

@lru_cache(maxsize=100)
def get_ticket_by_id(ticket_id: str) -> Optional[dict]:
    """Cached ticket lookup."""
    # Implementation with caching
    pass

Lazy Loading Patterns

class LazyTicketLoader:
    """Lazy loading for ticket collections."""

    def __init__(self, ticket_ids: List[str]):
        self.ticket_ids = ticket_ids
        self._loaded_tickets = {}

    def load_ticket(self, ticket_id: str) -> Optional[dict]:
        """Load individual ticket on demand."""
        if ticket_id not in self._loaded_tickets:
            self._loaded_tickets[ticket_id] = self._fetch_ticket(ticket_id)
        return self._loaded_tickets[ticket_id]

    def load_batch(self, count: int = 10) -> List[dict]:
        """Load tickets in batches."""
        unloaded = [tid for tid in self.ticket_ids[:count] 
                   if tid not in self._loaded_tickets]

        for ticket_id in unloaded:
            self.load_ticket(ticket_id)

        return [self._loaded_tickets[tid] for tid in self.ticket_ids[:count]]

    def _fetch_ticket(self, ticket_id: str) -> Optional[dict]:
        """Actual ticket loading logic."""
        # Implementation
        pass

๐Ÿ“ˆ Monitoring & Observability

Metrics Collection

import time
from collections import defaultdict
from typing import Dict, Any

class MCPMetrics:
    """Metrics collection for MCP operations."""

    def __init__(self):
        self.operation_counts = defaultdict(int)
        self.operation_times = defaultdict(list)
        self.error_counts = defaultdict(int)
        self.rate_limit_hits = defaultdict(int)

    def record_operation(self, operation: str, duration: float, success: bool):
        """Record operation metrics."""
        self.operation_counts[operation] += 1
        self.operation_times[operation].append(duration)

        if not success:
            self.error_counts[operation] += 1

    def record_rate_limit(self, operation: str):
        """Record rate limit violation."""
        self.rate_limit_hits[operation] += 1

    def get_summary(self) -> Dict[str, Any]:
        """Get metrics summary."""
        return {
            'total_operations': sum(self.operation_counts.values()),
            'average_response_time': self._calculate_avg_response_time(),
            'error_rate': self._calculate_error_rate(),
            'top_operations': self._get_top_operations(),
            'rate_limit_violations': sum(self.rate_limit_hits.values())
        }

    def _calculate_avg_response_time(self) -> float:
        """Calculate overall average response time."""
        all_times = []
        for times in self.operation_times.values():
            all_times.extend(times)

        return sum(all_times) / len(all_times) if all_times else 0.0

    def _calculate_error_rate(self) -> float:
        """Calculate overall error rate."""
        total_ops = sum(self.operation_counts.values())
        total_errors = sum(self.error_counts.values())

        return (total_errors / total_ops) * 100 if total_ops > 0 else 0.0

# Global metrics instance
mcp_metrics = MCPMetrics()

Health Checks

class HealthChecker:
    """System health monitoring."""

    def __init__(self, working_directory: Path):
        self.working_directory = working_directory
        self.gira_dir = working_directory / '.gira'

    def check_health(self) -> Dict[str, Any]:
        """Comprehensive health check."""
        health = {
            'status': 'healthy',
            'checks': {}
        }

        # Check project structure
        health['checks']['project_structure'] = self._check_project_structure()

        # Check file system permissions
        health['checks']['permissions'] = self._check_permissions()

        # Check data integrity
        health['checks']['data_integrity'] = self._check_data_integrity()

        # Check system resources
        health['checks']['resources'] = self._check_resources()

        # Determine overall status
        if any(check['status'] == 'error' for check in health['checks'].values()):
            health['status'] = 'error'
        elif any(check['status'] == 'warning' for check in health['checks'].values()):
            health['status'] = 'warning'

        return health

    def _check_project_structure(self) -> Dict[str, Any]:
        """Check Gira project structure integrity."""
        required_dirs = ['board', 'backlog', 'epics', 'sprints']
        missing_dirs = []

        for dir_name in required_dirs:
            if not (self.gira_dir / dir_name).exists():
                missing_dirs.append(dir_name)

        if missing_dirs:
            return {
                'status': 'error',
                'message': f'Missing directories: {missing_dirs}'
            }

        return {'status': 'ok', 'message': 'Project structure valid'}

This comprehensive architecture documentation provides developers with the technical foundation needed to understand, extend, and maintain the Gira MCP server implementation.