Spaces:
Running
Running
| import json | |
| import os | |
| import datetime | |
| import threading | |
| from collections import defaultdict | |
| from typing import List, Dict, Any, Optional | |
| from fastapi import Request | |
| class UsageTracker: | |
| def __init__(self, data_file="usage_data.json"): | |
| self.data_file = data_file | |
| self.lock = threading.Lock() | |
| self.data = self._load_data() | |
| self._schedule_save() | |
| def _load_data(self) -> Dict[str, Any]: | |
| """ | |
| Loads usage data from the JSON file, ensuring data integrity. | |
| Handles cases where the file might be corrupted or in an old format. | |
| """ | |
| if os.path.exists(self.data_file): | |
| try: | |
| with open(self.data_file, 'r') as f: | |
| data = json.load(f) | |
| # Check if data is in the expected new format | |
| if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data: | |
| return data | |
| # If data is in an older, simpler format, convert it | |
| elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format | |
| return self._convert_old_format(data) | |
| except (json.JSONDecodeError, TypeError) as e: | |
| print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.") | |
| return self._initialize_empty_data() | |
| def _initialize_empty_data(self) -> Dict[str, Any]: | |
| """ | |
| Initializes a new, empty data structure for usage tracking. | |
| This structure includes a list for all requests, and dictionaries | |
| to store aggregated data for models and API endpoints. | |
| """ | |
| return { | |
| 'requests': [], | |
| 'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}), | |
| 'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}) | |
| } | |
| def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Converts data from the old format to the new detailed format. | |
| This is a crucial step to avoid data loss on updates. | |
| It iterates through old 'requests' (if any) and re-records them | |
| into the new structured format. | |
| """ | |
| print("Converting old usage data format to new format.") | |
| new_data = self._initialize_empty_data() | |
| # Preserve existing requests if they follow a basic structure | |
| if 'requests' in old_data and isinstance(old_data['requests'], list): | |
| for req in old_data['requests']: | |
| # Attempt to extract relevant fields from old request entry | |
| timestamp_str = req.get('timestamp') | |
| model_name = req.get('model', 'unknown_model') | |
| endpoint_name = req.get('endpoint', 'unknown_endpoint') | |
| ip_address = req.get('ip_address', 'N/A') | |
| user_agent = req.get('user_agent', 'N/A') | |
| # Ensure timestamp is valid and parseable | |
| try: | |
| timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc) | |
| except ValueError: | |
| timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed | |
| new_data['requests'].append({ | |
| 'timestamp': timestamp.isoformat(), | |
| 'model': model_name, | |
| 'endpoint': endpoint_name, | |
| 'ip_address': ip_address, | |
| 'user_agent': user_agent, | |
| }) | |
| # Update aggregated stats for models and endpoints | |
| # This ensures that even old data contributes to the new summary | |
| if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']): | |
| new_data['models'][model_name]['first_used'] = timestamp.isoformat() | |
| if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']): | |
| new_data['models'][model_name]['last_used'] = timestamp.isoformat() | |
| new_data['models'][model_name]['total_requests'] += 1 | |
| if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']): | |
| new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat() | |
| if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']): | |
| new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat() | |
| new_data['api_endpoints'][endpoint_name]['total_requests'] += 1 | |
| print("Data conversion complete.") | |
| return new_data | |
| def save_data(self): | |
| """Saves current usage data to the JSON file periodically.""" | |
| with self.lock: | |
| try: | |
| # Convert defaultdicts to regular dicts for JSON serialization | |
| serializable_data = { | |
| 'requests': self.data['requests'], | |
| 'models': dict(self.data['models']), | |
| 'api_endpoints': dict(self.data['api_endpoints']) | |
| } | |
| with open(self.data_file, 'w') as f: | |
| json.dump(serializable_data, f, indent=4) | |
| except IOError as e: | |
| print(f"Error saving usage data to {self.data_file}: {e}") | |
| def _schedule_save(self): | |
| """Schedules the data to be saved every 60 seconds.""" | |
| # Use a non-daemon thread for saving to ensure it runs even if main thread exits | |
| # if using daemon threads, ensure proper shutdown hook is in place. | |
| # For simplicity in this context, a direct Timer call is fine. | |
| threading.Timer(60.0, self._schedule_save).start() | |
| self.save_data() | |
| def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"): | |
| """ | |
| Records a single API request with detailed information. | |
| Updates both the raw request list and aggregated statistics. | |
| """ | |
| with self.lock: | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| ip_address = request.client.host if request and request.client else "N/A" | |
| user_agent = request.headers.get("user-agent", "N/A") if request else "N/A" | |
| # Append to raw requests list | |
| self.data['requests'].append({ | |
| 'timestamp': now.isoformat(), | |
| 'model': model, | |
| 'endpoint': endpoint, | |
| 'ip_address': ip_address, | |
| 'user_agent': user_agent, | |
| }) | |
| # Update model specific stats | |
| model_stats = self.data['models'][model] | |
| model_stats['total_requests'] += 1 | |
| if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']): | |
| model_stats['first_used'] = now.isoformat() | |
| if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']): | |
| model_stats['last_used'] = now.isoformat() | |
| # Update endpoint specific stats | |
| endpoint_stats = self.data['api_endpoints'][endpoint] | |
| endpoint_stats['total_requests'] += 1 | |
| if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']): | |
| endpoint_stats['first_used'] = now.isoformat() | |
| if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']): | |
| endpoint_stats['last_used'] = now.isoformat() | |
| def get_usage_summary(self, days: int = 7) -> Dict[str, Any]: | |
| """ | |
| Generates a comprehensive summary of usage data for the specified number of days. | |
| Includes total requests, model usage, endpoint usage, daily usage, and unique IPs. | |
| """ | |
| with self.lock: | |
| summary = { | |
| 'total_requests': 0, | |
| 'model_usage': defaultdict(int), # Requests per model for the period | |
| 'endpoint_usage': defaultdict(int), # Requests per endpoint for the period | |
| 'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats | |
| 'unique_ips_total': set(), # Unique IPs across all requests | |
| 'recent_requests': [] | |
| } | |
| # Prepare data for model and API endpoint charts | |
| # These are based on the aggregated 'self.data' which covers all time, | |
| # but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period. | |
| all_time_model_data = { | |
| model: { | |
| 'total_requests': stats['total_requests'], | |
| 'first_used': stats['first_used'], | |
| 'last_used': stats['last_used'] | |
| } for model, stats in self.data['models'].items() | |
| } | |
| all_time_endpoint_data = { | |
| endpoint: { | |
| 'total_requests': stats['total_requests'], | |
| 'first_used': stats['first_used'], | |
| 'last_used': stats['last_used'] | |
| } for endpoint, stats in self.data['api_endpoints'].items() | |
| } | |
| cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) | |
| # Iterate backwards for recent requests and aggregate data for the specified period | |
| requests_for_period = [] | |
| for req in reversed(self.data['requests']): | |
| req_time = datetime.datetime.fromisoformat(req['timestamp']) | |
| # Always update total requests and unique IPs for all time | |
| summary['total_requests'] += 1 | |
| summary['unique_ips_total'].add(req['ip_address']) | |
| if req_time >= cutoff_date: | |
| requests_for_period.append(req) | |
| date_str = req_time.strftime("%Y-%m-%d") | |
| # Aggregate data for charts and tables for the given period | |
| summary['model_usage'][req['model']] += 1 | |
| summary['endpoint_usage'][req['endpoint']] += 1 | |
| summary['daily_usage'][date_str]['requests'] += 1 | |
| summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address']) | |
| # Add to recent requests list (up to 20) | |
| if len(summary['recent_requests']) < 20: | |
| summary['recent_requests'].append(req) | |
| # Convert daily unique IPs set to count | |
| for date_str, daily_stats in summary['daily_usage'].items(): | |
| daily_stats['unique_ips_count'] = len(daily_stats['unique_ips']) | |
| del daily_stats['unique_ips'] # Remove the set before returning | |
| # Sort daily usage by date | |
| summary['daily_usage'] = dict(sorted(summary['daily_usage'].items())) | |
| # Convert defaultdicts to regular dicts for final summary | |
| summary['model_usage_period'] = dict(summary['model_usage']) | |
| summary['endpoint_usage_period'] = dict(summary['endpoint_usage']) | |
| summary['daily_usage_period'] = dict(summary['daily_usage']) | |
| # Add all-time data | |
| summary['all_time_model_usage'] = all_time_model_data | |
| summary['all_time_endpoint_usage'] = all_time_endpoint_data | |
| summary['unique_ips_total_count'] = len(summary['unique_ips_total']) | |
| del summary['unique_ips_total'] # No need to send the whole set | |
| # Clean up defaultdicts that are not needed in the final output structure | |
| del summary['model_usage'] | |
| del summary['endpoint_usage'] | |
| del summary['daily_usage'] | |
| return summary | |