Skip to content

Metrics API Reference

Overview

The Metrics API provides functionality for collecting, storing, and analyzing execution metrics from Ralph Orchestrator runs.

Metrics Collection

MetricsCollector Class

class MetricsCollector:
    """
    Collects and manages execution metrics.

    Example:
        collector = MetricsCollector()
        collector.start_iteration(1)
        # ... execution ...
        collector.end_iteration(1, success=True)
        collector.save()
    """

    def __init__(self, metrics_dir: str = '.agent/metrics'):
        """
        Initialize metrics collector.

        Args:
            metrics_dir (str): Directory to store metrics files
        """
        self.metrics_dir = metrics_dir
        self.current_metrics = {
            'start_time': None,
            'iterations': [],
            'errors': [],
            'checkpoints': [],
            'agent_executions': [],
            'resource_usage': []
        }
        self._ensure_metrics_dir()

    def _ensure_metrics_dir(self):
        """Create metrics directory if it doesn't exist."""
        os.makedirs(self.metrics_dir, exist_ok=True)

    def start_iteration(self, iteration_num: int):
        """
        Mark the start of an iteration.

        Args:
            iteration_num (int): Iteration number

        Example:
            collector.start_iteration(1)
        """
        self.current_iteration = {
            'number': iteration_num,
            'start_time': time.time(),
            'end_time': None,
            'duration': None,
            'success': False,
            'output_size': 0,
            'errors': []
        }

    def end_iteration(self, iteration_num: int, 
                     success: bool = True, 
                     output_size: int = 0):
        """
        Mark the end of an iteration.

        Args:
            iteration_num (int): Iteration number
            success (bool): Whether iteration succeeded
            output_size (int): Size of output in bytes

        Example:
            collector.end_iteration(1, success=True, output_size=2048)
        """
        if hasattr(self, 'current_iteration'):
            self.current_iteration['end_time'] = time.time()
            self.current_iteration['duration'] = (
                self.current_iteration['end_time'] - 
                self.current_iteration['start_time']
            )
            self.current_iteration['success'] = success
            self.current_iteration['output_size'] = output_size

            self.current_metrics['iterations'].append(self.current_iteration)
            del self.current_iteration

    def record_error(self, error: str, iteration: int = None):
        """
        Record an error.

        Args:
            error (str): Error message
            iteration (int): Iteration number where error occurred

        Example:
            collector.record_error("Agent timeout", iteration=5)
        """
        error_record = {
            'timestamp': time.time(),
            'iteration': iteration,
            'message': error,
            'traceback': traceback.format_exc() if sys.exc_info()[0] else None
        }

        self.current_metrics['errors'].append(error_record)

        if hasattr(self, 'current_iteration'):
            self.current_iteration['errors'].append(error)

    def record_checkpoint(self, iteration: int, commit_hash: str = None):
        """
        Record a checkpoint.

        Args:
            iteration (int): Iteration number
            commit_hash (str): Git commit hash

        Example:
            collector.record_checkpoint(5, "abc123def")
        """
        checkpoint = {
            'iteration': iteration,
            'timestamp': time.time(),
            'commit_hash': commit_hash
        }
        self.current_metrics['checkpoints'].append(checkpoint)

    def record_agent_execution(self, agent: str, 
                              duration: float, 
                              success: bool):
        """
        Record agent execution details.

        Args:
            agent (str): Agent name
            duration (float): Execution duration
            success (bool): Whether execution succeeded

        Example:
            collector.record_agent_execution('claude', 45.3, True)
        """
        execution = {
            'agent': agent,
            'duration': duration,
            'success': success,
            'timestamp': time.time()
        }
        self.current_metrics['agent_executions'].append(execution)

    def record_resource_usage(self):
        """
        Record current resource usage.

        Example:
            collector.record_resource_usage()
        """
        import psutil

        process = psutil.Process()
        usage = {
            'timestamp': time.time(),
            'cpu_percent': process.cpu_percent(),
            'memory_mb': process.memory_info().rss / 1024 / 1024,
            'disk_io': process.io_counters()._asdict() if hasattr(process, 'io_counters') else {},
            'open_files': len(process.open_files()) if hasattr(process, 'open_files') else 0
        }
        self.current_metrics['resource_usage'].append(usage)

    def save(self, filename: str = None):
        """
        Save metrics to file.

        Args:
            filename (str): Custom filename or auto-generated

        Returns:
            str: Path to saved metrics file

        Example:
            path = collector.save()
            print(f"Metrics saved to {path}")
        """
        if not filename:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f"metrics_{timestamp}.json"

        filepath = os.path.join(self.metrics_dir, filename)

        with open(filepath, 'w') as f:
            json.dump(self.current_metrics, f, indent=2, default=str)

        return filepath

State Management

StateManager Class

class StateManager:
    """
    Manages Ralph execution state.

    Example:
        state = StateManager()
        state.update(iteration=5, status='running')
        state.save()
    """

    def __init__(self, state_dir: str = '.agent/metrics'):
        """
        Initialize state manager.

        Args:
            state_dir (str): Directory for state files
        """
        self.state_dir = state_dir
        self.state_file = os.path.join(state_dir, 'state_latest.json')
        self.state = self.load() or self.initialize_state()

    def initialize_state(self) -> dict:
        """
        Initialize empty state.

        Returns:
            dict: Initial state structure
        """
        return {
            'status': 'idle',
            'iteration_count': 0,
            'start_time': None,
            'last_update': None,
            'runtime': 0,
            'agent': None,
            'prompt_file': None,
            'errors': [],
            'checkpoints': [],
            'task_complete': False
        }

    def load(self) -> dict:
        """
        Load state from file.

        Returns:
            dict: Loaded state or None

        Example:
            state = StateManager()
            current = state.load()
        """
        if os.path.exists(self.state_file):
            try:
                with open(self.state_file) as f:
                    return json.load(f)
            except json.JSONDecodeError:
                return None
        return None

    def save(self):
        """
        Save current state to file.

        Example:
            state.update(iteration=10)
            state.save()
        """
        os.makedirs(self.state_dir, exist_ok=True)

        # Update last_update timestamp
        self.state['last_update'] = time.time()

        # Save to latest file
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f, indent=2, default=str)

        # Also save timestamped version
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        archive_file = os.path.join(
            self.state_dir, 
            f"state_{timestamp}.json"
        )
        with open(archive_file, 'w') as f:
            json.dump(self.state, f, indent=2, default=str)

    def update(self, **kwargs):
        """
        Update state values.

        Args:
            **kwargs: State values to update

        Example:
            state.update(
                iteration_count=5,
                status='running',
                agent='claude'
            )
        """
        self.state.update(kwargs)

        # Calculate runtime if start_time exists
        if self.state.get('start_time'):
            self.state['runtime'] = time.time() - self.state['start_time']

    def get(self, key: str, default=None):
        """
        Get state value.

        Args:
            key (str): State key
            default: Default value if key not found

        Returns:
            Value from state

        Example:
            iteration = state.get('iteration_count', 0)
        """
        return self.state.get(key, default)

    def reset(self):
        """
        Reset state to initial values.

        Example:
            state.reset()
        """
        self.state = self.initialize_state()
        self.save()

Metrics Analysis

MetricsAnalyzer Class

class MetricsAnalyzer:
    """
    Analyze collected metrics.

    Example:
        analyzer = MetricsAnalyzer('.agent/metrics')
        report = analyzer.generate_report()
        print(report)
    """

    def __init__(self, metrics_dir: str = '.agent/metrics'):
        """
        Initialize metrics analyzer.

        Args:
            metrics_dir (str): Directory containing metrics files
        """
        self.metrics_dir = metrics_dir
        self.metrics_files = self.find_metrics_files()

    def find_metrics_files(self) -> List[str]:
        """
        Find all metrics files.

        Returns:
            list: Paths to metrics files
        """
        pattern = os.path.join(self.metrics_dir, 'metrics_*.json')
        return sorted(glob.glob(pattern))

    def load_metrics(self, filepath: str) -> dict:
        """
        Load metrics from file.

        Args:
            filepath (str): Path to metrics file

        Returns:
            dict: Loaded metrics
        """
        with open(filepath) as f:
            return json.load(f)

    def analyze_iterations(self, metrics: dict) -> dict:
        """
        Analyze iteration performance.

        Args:
            metrics (dict): Metrics data

        Returns:
            dict: Iteration analysis

        Example:
            metrics = analyzer.load_metrics('metrics.json')
            analysis = analyzer.analyze_iterations(metrics)
        """
        iterations = metrics.get('iterations', [])

        if not iterations:
            return {}

        durations = [i['duration'] for i in iterations if i.get('duration')]
        success_count = sum(1 for i in iterations if i.get('success'))

        return {
            'total_iterations': len(iterations),
            'successful_iterations': success_count,
            'success_rate': success_count / len(iterations) if iterations else 0,
            'avg_duration': sum(durations) / len(durations) if durations else 0,
            'min_duration': min(durations) if durations else 0,
            'max_duration': max(durations) if durations else 0,
            'total_duration': sum(durations)
        }

    def analyze_errors(self, metrics: dict) -> dict:
        """
        Analyze errors.

        Args:
            metrics (dict): Metrics data

        Returns:
            dict: Error analysis
        """
        errors = metrics.get('errors', [])

        if not errors:
            return {'total_errors': 0}

        # Group errors by iteration
        errors_by_iteration = {}
        for error in errors:
            iteration = error.get('iteration', 'unknown')
            if iteration not in errors_by_iteration:
                errors_by_iteration[iteration] = []
            errors_by_iteration[iteration].append(error['message'])

        return {
            'total_errors': len(errors),
            'errors_by_iteration': errors_by_iteration,
            'unique_errors': len(set(e['message'] for e in errors))
        }

    def analyze_resource_usage(self, metrics: dict) -> dict:
        """
        Analyze resource usage.

        Args:
            metrics (dict): Metrics data

        Returns:
            dict: Resource usage analysis
        """
        usage = metrics.get('resource_usage', [])

        if not usage:
            return {}

        cpu_values = [u['cpu_percent'] for u in usage if 'cpu_percent' in u]
        memory_values = [u['memory_mb'] for u in usage if 'memory_mb' in u]

        return {
            'avg_cpu_percent': sum(cpu_values) / len(cpu_values) if cpu_values else 0,
            'max_cpu_percent': max(cpu_values) if cpu_values else 0,
            'avg_memory_mb': sum(memory_values) / len(memory_values) if memory_values else 0,
            'max_memory_mb': max(memory_values) if memory_values else 0
        }

    def generate_report(self) -> str:
        """
        Generate comprehensive metrics report.

        Returns:
            str: Formatted report

        Example:
            report = analyzer.generate_report()
            print(report)
        """
        if not self.metrics_files:
            return "No metrics files found"

        # Load latest metrics
        latest_file = self.metrics_files[-1]
        metrics = self.load_metrics(latest_file)

        # Analyze different aspects
        iteration_analysis = self.analyze_iterations(metrics)
        error_analysis = self.analyze_errors(metrics)
        resource_analysis = self.analyze_resource_usage(metrics)

        # Format report
        report = []
        report.append("=" * 50)
        report.append("RALPH ORCHESTRATOR METRICS REPORT")
        report.append("=" * 50)
        report.append(f"Metrics File: {os.path.basename(latest_file)}")
        report.append("")

        # Iteration statistics
        report.append("ITERATION STATISTICS:")
        report.append(f"  Total Iterations: {iteration_analysis.get('total_iterations', 0)}")
        report.append(f"  Success Rate: {iteration_analysis.get('success_rate', 0):.1%}")
        report.append(f"  Avg Duration: {iteration_analysis.get('avg_duration', 0):.2f}s")
        report.append(f"  Total Runtime: {iteration_analysis.get('total_duration', 0):.2f}s")
        report.append("")

        # Error statistics
        report.append("ERROR STATISTICS:")
        report.append(f"  Total Errors: {error_analysis.get('total_errors', 0)}")
        report.append(f"  Unique Errors: {error_analysis.get('unique_errors', 0)}")
        report.append("")

        # Resource usage
        if resource_analysis:
            report.append("RESOURCE USAGE:")
            report.append(f"  Avg CPU: {resource_analysis.get('avg_cpu_percent', 0):.1f}%")
            report.append(f"  Max CPU: {resource_analysis.get('max_cpu_percent', 0):.1f}%")
            report.append(f"  Avg Memory: {resource_analysis.get('avg_memory_mb', 0):.1f} MB")
            report.append(f"  Max Memory: {resource_analysis.get('max_memory_mb', 0):.1f} MB")

        return "\n".join(report)

Metrics Export

Export Functions

def export_to_csv(metrics: dict, output_file: str):
    """
    Export metrics to CSV.

    Args:
        metrics (dict): Metrics data
        output_file (str): Output CSV file path

    Example:
        metrics = load_metrics('metrics.json')
        export_to_csv(metrics, 'metrics.csv')
    """
    import csv

    iterations = metrics.get('iterations', [])

    with open(output_file, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=[
            'number', 'start_time', 'end_time', 
            'duration', 'success', 'output_size'
        ])
        writer.writeheader()
        writer.writerows(iterations)

def export_to_prometheus(metrics: dict, output_file: str):
    """
    Export metrics in Prometheus format.

    Args:
        metrics (dict): Metrics data
        output_file (str): Output file path

    Example:
        export_to_prometheus(metrics, 'metrics.prom')
    """
    lines = []

    # Iteration metrics
    iterations = metrics.get('iterations', [])
    if iterations:
        total = len(iterations)
        successful = sum(1 for i in iterations if i.get('success'))

        lines.append(f'ralph_iterations_total {total}')
        lines.append(f'ralph_iterations_successful {successful}')
        lines.append(f'ralph_success_rate {successful/total if total else 0}')

    # Error metrics
    errors = metrics.get('errors', [])
    lines.append(f'ralph_errors_total {len(errors)}')

    # Write to file
    with open(output_file, 'w') as f:
        f.write('\n'.join(lines))

def export_to_json_lines(metrics: dict, output_file: str):
    """
    Export metrics as JSON lines for streaming.

    Args:
        metrics (dict): Metrics data
        output_file (str): Output file path

    Example:
        export_to_json_lines(metrics, 'metrics.jsonl')
    """
    with open(output_file, 'w') as f:
        for iteration in metrics.get('iterations', []):
            f.write(json.dumps(iteration) + '\n')

Real-time Metrics

class RealtimeMetrics:
    """
    Provide real-time metrics access.

    Example:
        realtime = RealtimeMetrics()
        realtime.start_monitoring()
        # ... execution ...
        stats = realtime.get_current_stats()
    """

    def __init__(self):
        self.current_stats = {}
        self.monitoring = False

    def start_monitoring(self):
        """Start real-time monitoring."""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()

    def _monitor_loop(self):
        """Background monitoring loop."""
        while self.monitoring:
            self.current_stats = {
                'timestamp': time.time(),
                'cpu_percent': psutil.cpu_percent(interval=1),
                'memory_percent': psutil.virtual_memory().percent,
                'disk_usage': psutil.disk_usage('.').percent
            }
            time.sleep(5)

    def get_current_stats(self) -> dict:
        """Get current statistics."""
        return self.current_stats.copy()

    def stop_monitoring(self):
        """Stop monitoring."""
        self.monitoring = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join(timeout=1)