import os import re from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request, Depends, Security, Query from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse, FileResponse, PlainTextResponse from fastapi.security import APIKeyHeader from pydantic import BaseModel import httpx from functools import lru_cache from pathlib import Path import json import datetime import time import asyncio from starlette.status import HTTP_403_FORBIDDEN import cloudscraper from concurrent.futures import ThreadPoolExecutor import uvloop from fastapi.middleware.gzip import GZipMiddleware from starlette.middleware.cors import CORSMiddleware import contextlib import requests from typing import List, Dict, Any, Optional, Union # Import Optional and other typing helpers from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import StreamingResponse import httpx import hashlib import asyncio from functools import lru_cache asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) executor = ThreadPoolExecutor(max_workers=16) load_dotenv() api_key_header = APIKeyHeader(name="Authorization", auto_error=False) from usage_tracker import UsageTracker usage_tracker = UsageTracker() # Disable the basic docs because we are better than that app = FastAPI(docs_url=None, redoc_url=None) app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @lru_cache(maxsize=1) def get_env_vars(): return { 'api_keys': os.getenv('API_KEYS', '').split(','), 'secret_api_endpoint': os.getenv('SECRET_API_ENDPOINT'), 'secret_api_endpoint_2': os.getenv('SECRET_API_ENDPOINT_2'), 'secret_api_endpoint_3': os.getenv('SECRET_API_ENDPOINT_3'), 'secret_api_endpoint_4': os.getenv('SECRET_API_ENDPOINT_4', "https://enter.pollinations.ai/api/generate"), 'pollinations_key': os.getenv('POLLINATIONS_KEY'), 'secret_api_endpoint_5': os.getenv('SECRET_API_ENDPOINT_5'), 'secret_api_endpoint_6': os.getenv('SECRET_API_ENDPOINT_6'), 'secret_api_endpoint_7': os.getenv('SECRET_API_ENDPOINT_7'), # <-- Added 'mistral_api': os.getenv('MISTRAL_API', "https://api.mistral.ai"), 'mistral_key': os.getenv('MISTRAL_KEY'), 'gemini_key': os.getenv('GEMINI_KEY'), 'bytez_key': os.getenv('BYTEZ_KEY'), # <-- Added 'endpoint_origin': os.getenv('ENDPOINT_ORIGIN'), 'new_img': os.getenv('NEW_IMG') } mistral_models = { "mistral-large-latest", "pixtral-large-latest", "mistral-moderation-latest", "ministral-3b-latest", "ministral-8b-latest", "open-mistral-nemo", "mistral-small-latest", "mistral-saba-latest", "codestral-latest" } pollinations_models = { "openai", "openai-fast", "openai-large", "openai-reasoning", "qwen-coder", "mistral", "naughty", "deepseek", "openai-audio", "claude", "gemini", "gemini-search", "unity", "midijourney", "evil", "chickytutor", "perplexity-fast", "perplexity-reasoning" } alternate_models = { "o1", "llama-4-scout", "o4-mini", "sonar", "sonar-pro", "sonar-reasoning", "sonar-reasoning-pro", "grok-3", "grok-3-fast", "r1-1776", "o3" } claude_3_models = { "claude-3-7-sonnet", "claude-3-7-sonnet-thinking", "claude 3.5 haiku", "claude 3.5 sonnet", "claude 3.5 haiku", "o3-mini-medium", "o3-mini-high", "grok-3", "grok-3-thinking", "grok 2" } bytez_models = { "openai/gpt-4.1-nano", "openai/gpt-4o", "openai/gpt-4.1", "openai/gpt-4.1-mini", "openai/gpt-4o-mini", "openai/gpt-3.5-turbo", "openai/gpt-3.5-turbo-1106", "openai/o3", "openai/o1", "openai/gpt-4", "openai/gpt-5-nano", "openai/gpt-5-mini", "openai/gpt-5" } gemini_models = { # Gemini 1.5 Series "gemini-1.5-pro", # Alias for latest stable 1.5 Pro[4][5] "gemini-1.5-pro-002", # Latest 1.5 Pro stable[4][5] "gemini-1.5-flash", # Alias for latest stable 1.5 Flash[4][5] "gemini-1.5-flash-002", # Latest 1.5 Flash stable[4][5] "gemini-1.5-flash-8b", # 1.5 Flash 8B variant[1] # Gemini 2.0 Series "gemini-2.0-flash-lite", # Cost-efficient model[5] "gemini-2.0-flash-lite-preview", # Preview version[1] "gemini-2.0-flash", # Default model as of Jan 2025[5] "gemini-2.0-flash-exp", # Experimental version[1] "gemini-2.0-flash-thinking", # Exposes model reasoning[5] "gemini-2.0-flash-thinking-exp-01-21", # Experimental thinking[1] "gemini-2.0-flash-preview-image-generation", # Image generation[1] "gemini-2.0-pro-exp-02-05", # 2.0 Pro Experimental[1] # Gemini 2.5 Series "gemini-2.5-flash", # Default model as of May 2025[5][7] "gemini-2.5-flash-preview-05-20", # 2.5 Flash preview[3] "gemini-2.5-flash-preview-native-audio-dialog", # Native audio output[3] "gemini-2.5-flash-exp-native-audio-thinking-dialog", # Audio thinking[3] "gemini-2.5-pro", # 2.5 Pro (active, most advanced)[6][7] "gemini-2.5-pro-preview-06-05", # Latest 2.5 Pro preview[3] "gemini-2.5-pro-preview-03-25", # 2.5 Pro preview[1][3] "gemini-2.5-pro-exp-03-25", # 2.5 Pro experimental[3] "gemini-2.5-pro-preview-tts", # Speech generation[3] "gemini-2.5-flash-preview-tts", # Speech generation[3] # Experimental and Special Models "gemini-exp-1206", # Experimental 2.0 Pro[1] "gemini-embedding-exp-03-07",# Experimental embeddings[3] } supported_image_models = { "Flux Pro Ultra", "grok-2-aurora", "Flux Pro", "Flux Pro Ultra Raw", "Flux Dev", "Flux Schnell", "stable-diffusion-3-large-turbo", "Flux Realism", "stable-diffusion-ultra", "dall-e-3", "sdxl-lightning-4step" } class Payload(BaseModel): model: str messages: list stream: bool = False # Add optional fields for tools and tool_choice tools: Optional[List[Dict[str, Any]]] = None tool_choice: Optional[Union[str, Dict[str, Any]]] = None class ImageGenerationPayload(BaseModel): model: str prompt: str size: str = "1024x1024" number: int = 1 server_status = True available_model_ids: list[str] = [] @lru_cache(maxsize=1) def get_async_client(): return httpx.AsyncClient( timeout=60.0, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) scraper_pool = [] MAX_SCRAPERS = 20 def get_scraper(): if not scraper_pool: for _ in range(MAX_SCRAPERS): scraper_pool.append(cloudscraper.create_scraper()) return scraper_pool[int(time.time() * 1000) % MAX_SCRAPERS] async def verify_api_key(request: Request, api_key: str = None): return True @lru_cache(maxsize=1) def load_models_data(): try: file_path = Path(__file__).parent / 'models.json' with open(file_path, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Error loading models.json: {str(e)}") return [] @app.get("/api/v1/models") @app.get("/models") async def get_models(): models_data = load_models_data() if not models_data: raise HTTPException(status_code=500, detail="Error loading available models") return models_data async def generate_search_async(query: str, systemprompt: str | None = None, stream: bool = True): headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} system_message = systemprompt or "Be Helpful and Friendly" prompt_messages = [{"role": "user", "content": query}] prompt_messages.insert(0, {"content": system_message, "role": "system"}) payload = { "is_vscode_extension": True, "message_history": prompt_messages, "requested_model": "searchgpt", "user_input": prompt_messages[-1]["content"], } secret_api_endpoint_3 = get_env_vars()['secret_api_endpoint_3'] if not secret_api_endpoint_3: raise HTTPException(status_code=500, detail="Search API endpoint not configured") client = get_async_client() if stream: queue = asyncio.Queue() async def _fetch_search_data_stream(): try: async with client.stream("POST", secret_api_endpoint_3, json=payload, headers=headers) as response: if response.status_code != 200: error_detail = await response.text() await queue.put({"error": f"Search API returned status code {response.status_code}: {error_detail}"}) return async for line in response.aiter_lines(): if line.startswith("data: "): try: json_data = json.loads(line[6:]) content = json_data.get("choices", [{}])[0].get("delta", {}).get("content", "") if content.strip(): cleaned_response = { "created": json_data.get("created"), "id": json_data.get("id"), "model": "searchgpt", "object": "chat.completion", "choices": [ { "message": { "content": content } } ] } await queue.put({"data": f"data: {json.dumps(cleaned_response)}\n\n", "text": content}) except json.JSONDecodeError: if line.strip() == "[DONE]": continue print(f"Warning: Could not decode JSON from search API stream: {line}") await queue.put({"error": f"Invalid JSON from search API: {line}"}) break await queue.put(None) except Exception as e: print(f"Error in _fetch_search_data_stream: {e}") await queue.put({"error": str(e)}) await queue.put(None) asyncio.create_task(_fetch_search_data_stream()) return queue else: try: response = await client.post(secret_api_endpoint_3, json=payload, headers=headers) response.raise_for_status() json_data = response.json() content = json_data.get("choices", [{}])[0].get("message", {}).get("content", "") return {"response": content} except httpx.HTTPStatusError as e: raise HTTPException(status_code=e.response.status_code, detail=f"Search API returned status {e.response.status_code}: {e.response.text}") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Failed to connect to search API: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during search: {str(e)}") @lru_cache(maxsize=10) def read_html_file(file_path): try: with open(file_path, "r") as file: return file.read() except FileNotFoundError: return None @app.get("/favicon.ico") async def favicon(): favicon_path = Path(__file__).parent / "favicon.ico" return FileResponse(favicon_path, media_type="image/x-icon") @app.get("/banner.jpg") async def banner(): banner_path = Path(__file__).parent / "banner.jpg" return FileResponse(banner_path, media_type="image/jpeg") @app.get("/ping") async def ping(): return {"message": "pong", "response_time": "0.000000 seconds"} @app.get("/", response_class=HTMLResponse) async def root(): html_content = read_html_file("index.html") if html_content is None: raise HTTPException(status_code=404, detail="index.html not found") return HTMLResponse(content=html_content) @app.get("/scraper", response_class=PlainTextResponse) async def scrape_site(url: str = Query(..., description="URL to scrape")): try: loop = asyncio.get_running_loop() response_text = await loop.run_in_executor( executor, lambda: get_scraper().get(url).text ) if response_text and len(response_text.strip()) > 0: return PlainTextResponse(response_text) else: raise HTTPException(status_code=500, detail="Scraping returned empty content.") except Exception as e: print(f"Cloudscraper failed: {e}") raise HTTPException(status_code=500, detail=f"Cloudscraper failed: {e}") @app.get("/playground", response_class=HTMLResponse) async def playground(): html_content = read_html_file("playground.html") if html_content is None: raise HTTPException(status_code=404, detail="playground.html not found") return HTMLResponse(content=html_content) @app.get("/image-playground", response_class=HTMLResponse) async def image_playground(): html_content = read_html_file("image-playground.html") if html_content is None: raise HTTPException(status_code=404, detail="image-playground.html not found") return HTMLResponse(content=html_content) GITHUB_BASE = "[https://raw.githubusercontent.com/Parthsadaria/Vetra/main](https://raw.githubusercontent.com/Parthsadaria/Vetra/main)" FILES = { "html": "index.html", "css": "style.css", "js": "script.js" } @app.get("/docs", include_in_schema=False) async def custom_documentation(): return HTMLResponse(""" Loki API - The Good Stuff """) header_url = os.getenv('HEADER_URL') # Drop-in simplified version that forwards upstream errors directly # (No custom mapping, no rewriting, just pure pass-through) @app.post("/chat/completions") @app.post("/api/v1/chat/completions") async def get_completion(payload: Payload, request: Request, authenticated: bool = Depends(verify_api_key)): if not server_status: raise HTTPException( status_code=503, detail="Server is under maintenance. Please try again later." ) print("User request payload:", payload.dict()) model_to_use = payload.model or "mistral-small-latest" if available_model_ids and model_to_use not in set(available_model_ids): raise HTTPException( status_code=400, detail=f"Model '{model_to_use}' is not available. Check /models for the available model list." ) usage_tracker.record_request(request=request, model=model_to_use, endpoint="/chat/completions") payload_dict = payload.dict(exclude_none=True) # Forward tools & tool_choice untouched if payload.tools is not None: payload_dict["tools"] = payload.tools if payload.tool_choice is not None: payload_dict["tool_choice"] = payload.tool_choice stream_enabled = payload_dict.get("stream", True) env_vars = get_env_vars() client = get_async_client() endpoint = None custom_headers = {} target_url_path = "/v1/chat/completions" # model routing (unchanged logic) if model_to_use in mistral_models: endpoint = env_vars['mistral_api'] custom_headers = {"Authorization": f"Bearer {env_vars['mistral_key']}"} elif model_to_use in bytez_models: endpoint = env_vars['secret_api_endpoint_7'] custom_headers = {"Authorization": f"Bearer {env_vars['bytez_key']}"} elif model_to_use in pollinations_models: endpoint = env_vars['secret_api_endpoint_4'] custom_headers = {"Authorization": f"Bearer {env_vars['pollinations_key']}"} elif model_to_use in alternate_models: endpoint = env_vars['secret_api_endpoint_2'] elif model_to_use in claude_3_models: endpoint = env_vars['secret_api_endpoint_5'] elif model_to_use in gemini_models: endpoint = env_vars['secret_api_endpoint_6'] custom_headers = {"Authorization": f"Bearer {env_vars['gemini_key']}"} target_url_path = "/chat/completions" # gemini path else: endpoint = env_vars['secret_api_endpoint'] custom_headers = { "Origin": header_url, "Priority": "u=1, i", "Referer": header_url } if not endpoint: raise HTTPException(status_code=500, detail=f"No API endpoint configured for model: {model_to_use}") print(f"Proxying request for model '{model_to_use}' to endpoint: {endpoint}{target_url_path}") # --- STREAM MODE --- if stream_enabled: async def real_time_stream_generator(): try: async with client.stream( "POST", f"{endpoint}{target_url_path}", json=payload_dict, headers=custom_headers ) as response: print("API response status (stream):", response.status_code) print("API response headers (stream):", dict(response.headers)) # If upstream returns error -> forward as-is if response.status_code >= 400: raw = await response.aread() raise HTTPException( status_code=response.status_code, detail=raw.decode("utf-8", errors="ignore") ) async for line in response.aiter_lines(): if line: print("API response chunk (stream):", line) yield line + "\n" except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Request to upstream AI service timed out.") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=str(e)) except Exception as e: if isinstance(e, HTTPException): raise raise HTTPException(status_code=500, detail=str(e)) return StreamingResponse( real_time_stream_generator(), media_type="text/event-stream", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # --- NON-STREAM MODE --- else: try: response = await client.post( f"{endpoint}{target_url_path}", json=payload_dict, headers=custom_headers, ) print("API response (non-stream):", response.text) if response.status_code >= 400: # forward EXACT upstream message raise HTTPException( status_code=response.status_code, detail=response.text ) return JSONResponse(content=response.json()) except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Request to upstream AI service timed out.") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=str(e)) except Exception as e: if isinstance(e, HTTPException): raise raise HTTPException(status_code=500, detail=str(e)) @lru_cache(maxsize=256) def cached_url(url: str): return url @app.get("/images/{prompt:path}") async def create_image( prompt: str, request: Request, authenticated: bool = Depends(verify_api_key) ): if not server_status: raise HTTPException(status_code=503, detail="Server is under maintenance.") # Build base URL safely base = "https://image.pollinations.ai/prompt/" final_url = f"{base}{prompt}?nologo=true" # If user provided custom query params, append them if request.url.query: final_url += f"&{request.url.query}" # Apply caching wrapper final_url = cached_url(final_url) try: async with httpx.AsyncClient(timeout=60) as client: resp = await client.get(final_url) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail="Image generation failed.") return StreamingResponse( resp.aiter_bytes(), media_type="image/jpeg" ) except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Image generation timeout.") except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") @app.get("/usage") async def get_usage_json(days: int = 7): return usage_tracker.get_usage_summary(days) @app.get("/usage/page", response_class=HTMLResponse) async def get_usage_page(days: int = Query(7, description="Number of days to include in the usage summary")): usage_data = usage_tracker.get_usage_summary(days) html_content = generate_usage_html(usage_data, days) return HTMLResponse(content=html_content) def generate_usage_html(usage_data: dict, days: int = 7): model_labels = list(usage_data['model_usage_period'].keys()) model_counts = list(usage_data['model_usage_period'].values()) endpoint_labels = list(usage_data['endpoint_usage_period'].keys()) endpoint_counts = list(usage_data['endpoint_usage_period'].values()) daily_dates = list(usage_data['daily_usage_period'].keys()) daily_requests = [data['requests'] for data in usage_data['daily_usage_period'].values()] daily_unique_ips = [data['unique_ips_count'] for data in usage_data['daily_usage_period'].values()] daily_usage_table_rows = "\n".join([ f""" {date} {data['requests']:,} {data['unique_ips_count']:,} """ for date, data in usage_data['daily_usage_period'].items() ]) model_usage_all_time_rows = "\n".join([ f"""
{model}
{stats['total_requests']:,} {datetime.datetime.fromisoformat(stats['first_used']).strftime("%Y-%m-%d %H:%M")} {datetime.datetime.fromisoformat(stats['last_used']).strftime("%Y-%m-%d %H:%M")} """ for model, stats in usage_data['all_time_model_usage'].items() ]) api_usage_all_time_rows = "\n".join([ f"""
{endpoint}
{stats['total_requests']:,} {datetime.datetime.fromisoformat(stats['first_used']).strftime("%Y-%m-%d %H:%M")} {datetime.datetime.fromisoformat(stats['last_used']).strftime("%Y-%m-%d %H:%M")} """ for endpoint, stats in usage_data['all_time_endpoint_usage'].items() ]) recent_requests_rows = "\n".join([ f""" {datetime.datetime.fromisoformat(req['timestamp']).strftime("%Y-%m-%d %H:%M:%S")} {req['model']} {req['endpoint']} {req['ip_address']} {req['user_agent'][:50]}... """ for req in usage_data['recent_requests'] ]) html_content = f""" Lokiai AI - Usage Analytics Dashboard

Total Requests

{usage_data['total_requests']:,}

All Time

Unique Users

{usage_data['unique_ips_total_count']:,}

All Time

Active Models

{len(usage_data['model_usage_period'])}

Last {days} Days

API Endpoints

{len(usage_data['endpoint_usage_period'])}

Last {days} Days

Daily Usage Trends

Last {days} days performance

Requests
Unique IPs

Model Distribution

Usage by AI models

API Endpoint Analytics

Distribution of requests across different endpoints

Daily Breakdown

Last {days} days detailed view

{daily_usage_table_rows}
Date Requests Unique IPs

Model Statistics

All-time model usage data

{model_usage_all_time_rows}
Model Requests First Used Last Used

API Endpoint Details

Complete endpoint usage statistics

{api_usage_all_time_rows}
Endpoint Total Requests First Used Last Used

Recent Activity

Last 20 requests in real-time

{recent_requests_rows}
Timestamp Model Endpoint IP Address User Agent
""" # Update the table row generation with better styling return html_content @app.on_event("startup") async def startup_event(): global available_model_ids models_data = load_models_data() available_model_ids = [m['id'] for m in models_data if isinstance(m, dict) and 'id' in m] # Add all hardcoded model sets available_model_ids.extend(list(pollinations_models)) available_model_ids.extend(list(alternate_models)) available_model_ids.extend(list(mistral_models)) available_model_ids.extend(list(claude_3_models)) available_model_ids.extend(list(gemini_models)) available_model_ids.extend(list(bytez_models)) # Remove duplicates available_model_ids = list(set(available_model_ids)) print(f"Total unique available models after merging: {len(available_model_ids)}") for _ in range(MAX_SCRAPERS): scraper_pool.append(cloudscraper.create_scraper()) print(f"Initialized Cloudscraper pool with {MAX_SCRAPERS} instances.") env_vars = get_env_vars() missing_vars = [] if not env_vars['api_keys'] or env_vars['api_keys'] == ['']: missing_vars.append('API_KEYS') if not env_vars['secret_api_endpoint']: missing_vars.append('SECRET_API_ENDPOINT') if not env_vars['secret_api_endpoint_2']: missing_vars.append('SECRET_API_ENDPOINT_2') if not env_vars['secret_api_endpoint_3']: missing_vars.append('SECRET_API_ENDPOINT_3') if not env_vars['secret_api_endpoint_4'] and any(model in pollinations_models for model in available_model_ids): missing_vars.append('SECRET_API_ENDPOINT_4 (Pollinations.ai)') if not env_vars['secret_api_endpoint_5'] and any(model in claude_3_models for model in available_model_ids): missing_vars.append('SECRET_API_ENDPOINT_5 (Claude 3.x)') if not env_vars['secret_api_endpoint_6'] and any(model in gemini_models for model in available_model_ids): missing_vars.append('SECRET_API_ENDPOINT_6 (Gemini)') if not env_vars['mistral_api'] and any(model in mistral_models for model in available_model_ids): missing_vars.append('MISTRAL_API') if not env_vars['mistral_key'] and any(model in mistral_models for model in available_model_ids): missing_vars.append('MISTRAL_KEY') if not env_vars['gemini_key'] and any(model in gemini_models for model in available_model_ids): missing_vars.append('GEMINI_KEY') if not env_vars['new_img'] and len(supported_image_models) > 0: missing_vars.append('NEW_IMG (Image Generation)') if missing_vars: print(f"WARNING: The following critical environment variables are missing or empty: {', '.join(missing_vars)}") print("Some server functionality (e.g., specific AI models, image generation) may be limited or unavailable.") else: print("All critical environment variables appear to be configured.") print("Server started successfully!") @app.on_event("shutdown") async def shutdown_event(): client = get_async_client() await client.aclose() scraper_pool.clear() usage_tracker.save_data() print("Server shutdown complete!") @app.get("/health") async def health_check(): env_vars = get_env_vars() missing_critical_vars = [] if not env_vars['api_keys'] or env_vars['api_keys'] == ['']: missing_critical_vars.append('API_KEYS') if not env_vars['secret_api_endpoint']: missing_critical_vars.append('SECRET_API_ENDPOINT') if not env_vars['secret_api_endpoint_2']: missing_critical_vars.append('SECRET_API_ENDPOINT_2') if not env_vars['secret_api_endpoint_3']: missing_critical_vars.append('SECRET_API_ENDPOINT_3') if not env_vars['secret_api_endpoint_4'] and any(model in pollinations_models for model in available_model_ids): missing_critical_vars.append('SECRET_API_ENDPOINT_4 (Pollinations.ai)') if not env_vars['secret_api_endpoint_5'] and any(model in claude_3_models for model in available_model_ids): missing_critical_vars.append('SECRET_API_ENDPOINT_5 (Claude 3.x)') if not env_vars['secret_api_endpoint_6'] and any(model in gemini_models for model in available_model_ids): missing_critical_vars.append('SECRET_API_ENDPOINT_6 (Gemini)') if not env_vars['mistral_api'] and any(model in mistral_models for model in available_model_ids): missing_critical_vars.append('MISTRAL_API') if not env_vars['mistral_key'] and any(model in mistral_models for model in available_model_ids): missing_critical_vars.append('MISTRAL_KEY') if not env_vars['gemini_key'] and any(model in gemini_models for model in available_model_ids): missing_critical_vars.append('GEMINI_KEY') if not env_vars['new_img'] and len(supported_image_models) > 0: missing_critical_vars.append('NEW_IMG (Image Generation)') if not env_vars['secret_api_endpoint_7'] and any(model in bytez_models for model in available_model_ids): missing_critical_vars.append('SECRET_API_ENDPOINT_7 (Bytez)') if not env_vars['bytez_key'] and any(model in bytez_models for model in available_model_ids): missing_critical_vars.append('BYTEZ_KEY (Bytez)') health_status = { "status": "healthy" if not missing_critical_vars else "unhealthy", "missing_env_vars": missing_critical_vars, "server_status": server_status, "message": "Everything's lit! 🚀" if not missing_critical_vars else "Uh oh, some env vars are missing. 😬" } return JSONResponse(content=health_status) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)