"""LLM Benchmarking Script Measures token usage, costs, and timing for cancer risk assessments across different LLM backends. """ import argparse import csv import functools import os import time from collections import defaultdict from collections.abc import Callable from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any import requests import yaml from dotenv import load_dotenv from langchain_community.callbacks.manager import get_openai_callback from loguru import logger from reportlab.lib import colors from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import inch from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle from sentinel.config import AppConfig, ModelConfig, ResourcePaths from sentinel.factory import SentinelFactory from sentinel.utils import load_user_file load_dotenv() @dataclass class ModelPricing: """Pricing per 1 million tokens in USD. Attributes: input_per_million: Cost per 1M input tokens (USD) output_per_million: Cost per 1M output tokens (USD) """ input_per_million: float output_per_million: float @dataclass class BenchmarkModelConfig: """Model configuration for benchmarking. Attributes: provider: Provider key (google, openai, local) model_name: Model identifier used by the provider pricing: Pricing information per 1M tokens """ provider: str model_name: str pricing: ModelPricing # Sources: # - https://ai.google.dev/pricing # - https://openai.com/api/pricing/ BENCHMARK_MODELS = [ BenchmarkModelConfig( provider="google", model_name="gemini-2.5-pro", pricing=ModelPricing(input_per_million=1.25, output_per_million=10.00), ), BenchmarkModelConfig( provider="google", model_name="gemini-2.5-flash-lite", pricing=ModelPricing(input_per_million=0.1, output_per_million=0.4), ), ] @dataclass class TokenUsage: """Token usage statistics for a single assessment. Attributes: input_tokens: Tokens in the prompt/input output_tokens: Tokens in the model's response """ input_tokens: int output_tokens: int @property def total_tokens(self) -> int: """Total tokens used. Returns: Sum of input and output tokens """ return self.input_tokens + self.output_tokens @dataclass class BenchmarkResult: """Results from a single model/profile benchmark run. Attributes: model_name: Name of the model provider: Provider key (openai, google, local) profile_name: Name of the profile token_usage: Token usage statistics cost: Cost in USD assessment_time_seconds: Time taken for assessment in seconds """ model_name: str provider: str profile_name: str token_usage: TokenUsage cost: float assessment_time_seconds: float def calculate_cost(token_usage: TokenUsage, pricing: ModelPricing) -> float: """Calculate cost based on token usage and model pricing. Args: token_usage: Token usage statistics pricing: Model pricing per 1M tokens Returns: Cost in USD """ input_cost = (token_usage.input_tokens / 1_000_000) * pricing.input_per_million output_cost = (token_usage.output_tokens / 1_000_000) * pricing.output_per_million return input_cost + output_cost def validate_directory_input(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator to validate directory argument. Args: func: Function to decorate Returns: Decorated function that validates directory input """ @functools.wraps(func) def wrapper(directory: Path, *args: Any, **kwargs: Any) -> Any: """Wrapper function to validate directory input. Args: directory: Path to directory to validate *args: Additional positional arguments **kwargs: Additional keyword arguments Returns: Result of the wrapped function Raises: FileNotFoundError: If the directory does not exist NotADirectoryError: If the path is not a directory ValueError: If the directory is empty """ if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory}") if not directory.is_dir(): raise NotADirectoryError(f"Not a directory: {directory}") if not any(directory.iterdir()): raise ValueError(f"Directory is empty: {directory}") return func(directory, *args, **kwargs) return wrapper def get_available_models() -> list[BenchmarkModelConfig]: """Get list of available models for benchmarking. Returns: List of configured benchmark models """ return BENCHMARK_MODELS @validate_directory_input def load_benchmark_profiles(benchmark_dir: Path) -> list[dict[str, Any]]: """Load benchmark profiles. Args: benchmark_dir: Directory containing benchmark YAML files Returns: List of dicts with 'name' and 'path' keys """ profiles = [] for yaml_file in sorted(benchmark_dir.glob("*.yaml")): profiles.append({"name": yaml_file.stem, "path": yaml_file}) return profiles def create_knowledge_base_paths(workspace_root: Path) -> ResourcePaths: """Build resource path configuration from workspace root. Args: workspace_root: Path to workspace root directory Returns: ResourcePaths configuration object """ return ResourcePaths( persona=workspace_root / "prompts/persona/default.md", instruction_assessment=workspace_root / "prompts/instruction/assessment.md", instruction_conversation=workspace_root / "prompts/instruction/conversation.md", output_format_assessment=workspace_root / "configs/output_format/assessment.yaml", output_format_conversation=workspace_root / "configs/output_format/conversation.yaml", cancer_modules_dir=workspace_root / "configs/knowledge_base/cancer_modules", dx_protocols_dir=workspace_root / "configs/knowledge_base/dx_protocols", ) def validate_backend(provider: str, model_name: str) -> None: """Validate that backend is accessible. Args: provider: Provider key (e.g. "openai", "google", "local") model_name: Model identifier Raises: ValueError: If the backend is not accessible """ if provider == "openai": if not os.getenv("OPENAI_API_KEY"): raise ValueError("OPENAI_API_KEY not set") elif provider == "google": if not os.getenv("GOOGLE_API_KEY"): raise ValueError("GOOGLE_API_KEY not set") elif provider == "local": ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") response = requests.get(f"{ollama_base_url}/api/tags", timeout=2) if response.status_code != 200: raise ValueError("Ollama server not responding") models = response.json().get("models", []) model_names = [m.get("name") for m in models] if model_name not in model_names: raise ValueError(f"Model not found. Run: ollama pull {model_name}") def run_assessment( model_config: BenchmarkModelConfig, profile_path: Path ) -> BenchmarkResult: """Run a single assessment and capture token usage. Args: model_config: Model configuration with pricing profile_path: Path to profile YAML file Returns: BenchmarkResult with cost and token usage """ validate_backend(model_config.provider, model_config.model_name) workspace_root = Path(__file__).parent.parent with open(workspace_root / "configs/config.yaml") as f: default_config = yaml.safe_load(f) app_config = AppConfig( model=ModelConfig( provider=model_config.provider, model_name=model_config.model_name, ), knowledge_base_paths=create_knowledge_base_paths(workspace_root), selected_cancer_modules=default_config["knowledge_base"]["cancer_modules"], selected_dx_protocols=default_config["knowledge_base"]["dx_protocols"], ) factory = SentinelFactory(app_config) conversation = factory.create_conversation_manager() user = load_user_file(str(profile_path)) start_time = time.perf_counter() with get_openai_callback() as cb: conversation.initial_assessment(user) input_tokens = cb.prompt_tokens output_tokens = cb.completion_tokens end_time = time.perf_counter() assessment_time = end_time - start_time token_usage = TokenUsage(input_tokens, output_tokens) cost = calculate_cost(token_usage, model_config.pricing) return BenchmarkResult( model_name=model_config.model_name, provider=model_config.provider, profile_name=profile_path.stem, token_usage=token_usage, cost=cost, assessment_time_seconds=assessment_time, ) def print_results(results: list[BenchmarkResult]) -> None: """Print formatted results to console. Args: results: List of benchmark results """ by_model = defaultdict(list) for result in results: by_model[result.model_name].append(result) lines = [] lines.append("\n╔══════════════════════════════════════════════════════════════╗") lines.append("║ LLM Cost Benchmark Results ║") lines.append("╚══════════════════════════════════════════════════════════════╝\n") for model_name, model_results in sorted(by_model.items()): provider = model_results[0].provider lines.append(f"Model: {model_name} ({provider})") num_results = len(model_results) avg_cost = sum(result.cost for result in model_results) / num_results avg_input = ( sum(result.token_usage.input_tokens for result in model_results) / num_results ) avg_output = ( sum(result.token_usage.output_tokens for result in model_results) / num_results ) avg_time = ( sum(result.assessment_time_seconds for result in model_results) / num_results ) for result_index, result in enumerate(model_results): is_last = result_index == num_results - 1 prefix = "└─" if is_last else "├─" indent = " " if is_last else "│ " lines.append(f"{prefix} Profile: {result.profile_name}") lines.append(f"{indent}├─ Input: {result.token_usage.input_tokens:,}") lines.append(f"{indent}├─ Output: {result.token_usage.output_tokens:,}") lines.append(f"{indent}├─ Cost: ${result.cost:.4f}") lines.append(f"{indent}└─ Time: {result.assessment_time_seconds:.2f}s") lines.append(f"└─ Average: ${avg_cost:.4f}") lines.append(f" ├─ Tokens: {avg_input:,.0f} input, {avg_output:,.0f} output") lines.append(f" └─ Time: {avg_time:.2f}s\n") lines.append("═══════════════════════════════════════════════════════════════") lines.append("Summary - Model Ranking (Cheapest to Most Expensive)") lines.append("───────────────────────────────────────────────────────────────") model_averages = sorted( ( ( model_name, sum(result.cost for result in model_results) / len(model_results), ) for model_name, model_results in by_model.items() ), key=lambda model_avg_tuple: model_avg_tuple[1], ) for rank, (model_name, avg_cost) in enumerate(model_averages, 1): prefix = ( "🥇" if rank == 1 else "🥈" if rank == 2 else "🥉" if rank == 3 else f"{rank}." ) lines.append(f"{prefix:<4} {model_name:<25} ${avg_cost:.4f}") lines.append("\n═══════════════════════════════════════════════════════════════") lines.append("Summary - Timing Performance (Fastest to Slowest)") lines.append("───────────────────────────────────────────────────────────────") model_timing = sorted( ( ( model_name, sum(result.assessment_time_seconds for result in model_results) / len(model_results), ) for model_name, model_results in by_model.items() ), key=lambda model_time_tuple: model_time_tuple[1], ) for rank, (model_name, avg_time) in enumerate(model_timing, 1): prefix = ( "🥇" if rank == 1 else "🥈" if rank == 2 else "🥉" if rank == 3 else f"{rank}." ) lines.append(f"{prefix:<4} {model_name:<25} {avg_time:.2f}s") lines.append(f"\nTotal: {len(results)} assessments across {len(by_model)} models") lines.append("═══════════════════════════════════════════════════════════════\n") logger.info("\n".join(lines)) def export_to_csv(results: list[BenchmarkResult], output_path: Path) -> None: """Export results to CSV file. Args: results: List of benchmark results output_path: Path to output CSV file """ with open(output_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow( [ "model_name", "provider", "profile_name", "input_tokens", "output_tokens", "total_tokens", "cost_usd", "assessment_time_seconds", ] ) for result in results: writer.writerow( [ result.model_name, result.provider, result.profile_name, result.token_usage.input_tokens, result.token_usage.output_tokens, result.token_usage.total_tokens, f"{result.cost:.6f}", f"{result.assessment_time_seconds:.3f}", ] ) logger.success(f"Results exported to: {output_path}") def export_to_pdf( results: list[BenchmarkResult], output_path: Path, ) -> None: """Export results to PDF file with formatted table. Args: results: List of benchmark results output_path: Path to output PDF file """ doc = SimpleDocTemplate( str(output_path), pagesize=letter, leftMargin=0.75 * inch, rightMargin=0.75 * inch, topMargin=0.75 * inch, bottomMargin=0.75 * inch, ) elements = [] styles = getSampleStyleSheet() title = Paragraph( "LLM Benchmark Report", styles["Title"], ) elements.append(title) elements.append(Spacer(1, 0.2 * inch)) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") timestamp_text = Paragraph( f"Generated: {timestamp}", styles["Normal"], ) elements.append(timestamp_text) elements.append(Spacer(1, 0.3 * inch)) by_model = defaultdict(list) for result in results: by_model[result.model_name].append(result) pricing_lookup = {model.model_name: model.pricing for model in BENCHMARK_MODELS} results_desc = Paragraph( "Average cost and timing for running a single cancer risk assessment given a completed patient questionnaire.", styles["Normal"], ) elements.append(results_desc) elements.append(Spacer(1, 0.2 * inch)) table_data = [ [ "Model", "Provider", "Avg Cost\nper Report", "Input Price\n(per 1M)", "Output Price\n(per 1M)", "Avg Input\nTokens", "Avg Output\nTokens", "Avg Time\n(seconds)", ] ] # Sort by average cost (cheapest first) sorted_models = sorted( by_model.items(), key=lambda model_tuple: sum(result.cost for result in model_tuple[1]) / len(model_tuple[1]), ) for model_name, model_results in sorted_models: provider = model_results[0].provider num_results = len(model_results) avg_cost = sum(result.cost for result in model_results) / num_results avg_input = ( sum(result.token_usage.input_tokens for result in model_results) / num_results ) avg_output = ( sum(result.token_usage.output_tokens for result in model_results) / num_results ) avg_time = ( sum(result.assessment_time_seconds for result in model_results) / num_results ) pricing = pricing_lookup.get(model_name) input_price = f"${pricing.input_per_million:.2f}" if pricing else "N/A" output_price = f"${pricing.output_per_million:.2f}" if pricing else "N/A" table_data.append( [ model_name, provider, f"${avg_cost:.4f}", input_price, output_price, f"{avg_input:,.0f}", f"{avg_output:,.0f}", f"{avg_time:.1f}", ] ) table = Table( table_data, colWidths=[ 1.4 * inch, 0.75 * inch, 0.8 * inch, 0.75 * inch, 0.75 * inch, 0.7 * inch, 0.7 * inch, 0.65 * inch, ], ) table_style = TableStyle( [ # Header styling ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4A90E2")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.whitesmoke), ("ALIGN", (0, 0), (-1, 0), "CENTER"), ("VALIGN", (0, 0), (-1, 0), "MIDDLE"), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ("FONTSIZE", (0, 0), (-1, 0), 8), ("BOTTOMPADDING", (0, 0), (-1, 0), 10), ("TOPPADDING", (0, 0), (-1, 0), 10), # Data rows styling ("BACKGROUND", (0, 1), (-1, -1), colors.beige), ("TEXTCOLOR", (0, 1), (-1, -1), colors.black), ("ALIGN", (0, 1), (1, -1), "LEFT"), ("ALIGN", (2, 1), (-1, -1), "CENTER"), ("VALIGN", (0, 1), (-1, -1), "MIDDLE"), ("FONTNAME", (0, 1), (-1, -1), "Helvetica"), ("FONTSIZE", (0, 1), (-1, -1), 8), ("TOPPADDING", (0, 1), (-1, -1), 7), ("BOTTOMPADDING", (0, 1), (-1, -1), 7), # Alternating row colors ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.beige, colors.lightgrey]), # Grid ("GRID", (0, 0), (-1, -1), 1, colors.black), ] ) table.setStyle(table_style) elements.append(table) elements.append(Spacer(1, 0.3 * inch)) doc.build(elements) logger.success(f"PDF report generated: {output_path}") def parse_args() -> argparse.Namespace: """Parse command-line arguments. Returns: Parsed command-line arguments """ workspace_root = Path(__file__).parent.parent parser = argparse.ArgumentParser(description="Benchmark LLM costs") parser.add_argument( "--benchmark-dir", type=Path, default=workspace_root / "examples/benchmark", help="Benchmark profile directory", ) parser.add_argument( "--models", nargs="+", help="Specific models to test (by name)", ) parser.add_argument( "--profiles", nargs="+", help="Specific profiles to test", ) parser.add_argument( "--output", type=Path, help="Export to CSV", ) return parser.parse_args() def main() -> None: """Main entry point. Raises: ValueError: If no matching models or profiles found """ args = parse_args() logger.info("Loading benchmark configuration...") all_models = get_available_models() logger.info("Loading profiles...") all_profiles = load_benchmark_profiles(args.benchmark_dir) if args.models: all_models = [model for model in all_models if model.model_name in args.models] if not all_models: raise ValueError(f"No matching models: {args.models}") if args.profiles: all_profiles = [ profile for profile in all_profiles if profile["name"] in args.profiles ] if not all_profiles: raise ValueError(f"No matching profiles: {args.profiles}") logger.info( f"\nRunning {len(all_models)} model(s) x {len(all_profiles)} profile(s)...\n" ) results = [] for model_index, model in enumerate(all_models, 1): for profile in all_profiles: logger.info( f"[{model_index}/{len(all_models)}] {model.model_name}: {profile['name']}" ) result = run_assessment(model, profile["path"]) results.append(result) print_results(results) # Generate PDF report with timestamp workspace_root = Path(__file__).parent.parent outputs_dir = workspace_root / "outputs" outputs_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") pdf_path = outputs_dir / f"llm_benchmark_{timestamp}.pdf" export_to_pdf(results, pdf_path) if args.output: export_to_csv(results, args.output) if __name__ == "__main__": main()