# app.py - نسخه Worker با پشتیبانی از Fallback و Retry Limit import os import sys import traceback import re import struct import time import uuid import shutil import logging import mimetypes import threading import random import asyncio import wave from fastapi import FastAPI, HTTPException from pydantic import BaseModel from google import genai from google.genai import types import uvicorn try: from pydub import AudioSegment PYDUB_AVAILABLE = True except ImportError: PYDUB_AVAILABLE = False logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # --- تنظیمات مدیریت کلیدها --- GEMINI_CLIENTS_CACHE = {} CLIENT_CACHE_LOCK = threading.Lock() ALL_API_KEYS: list[str] = [] def _init_api_keys(): global ALL_API_KEYS all_keys_string = os.environ.get("ALL_GEMINI_API_KEYS") if all_keys_string: ALL_API_KEYS = [key.strip() for key in all_keys_string.split(',') if key.strip()] logging.info(f"✅ تعداد {len(ALL_API_KEYS)} کلید API جیمینای شناسایی و بارگذاری شد.") if not ALL_API_KEYS: logging.warning("⛔️ هشدار: هیچ Secret با نام ALL_GEMINI_API_KEYS یافت نشد!") def get_random_api_key_and_client(): if not ALL_API_KEYS: return None, None key_to_use = random.choice(ALL_API_KEYS) with CLIENT_CACHE_LOCK: if key_to_use in GEMINI_CLIENTS_CACHE: client = GEMINI_CLIENTS_CACHE[key_to_use] else: client = genai.Client(api_key=key_to_use) GEMINI_CLIENTS_CACHE[key_to_use] = client return key_to_use, client FIXED_MODEL_NAME_STANDARD = "gemini-2.5-flash-preview-tts" FIXED_MODEL_NAME_LIVE = "models/gemini-2.5-flash-native-audio-preview-12-2025" DEFAULT_MAX_CHUNK_SIZE = 3800 DEFAULT_SLEEP_BETWEEN_REQUESTS = 5 def save_binary_file(file_name, data): try: with open(file_name, "wb") as f: f.write(data) return file_name except Exception as e: logging.error(f"❌ خطا در ذخیره فایل {file_name}: {e}") return None def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes: parameters = parse_audio_mime_type(mime_type) bits_per_sample, rate = parameters["bits_per_sample"], parameters["rate"] num_channels, data_size = 1, len(audio_data) bytes_per_sample, block_align = bits_per_sample // 8, num_channels * (bits_per_sample // 8) byte_rate, chunk_size = rate * block_align, 36 + data_size header = struct.pack("<4sI4s4sIHHIIHH4sI", b"RIFF", chunk_size, b"WAVE", b"fmt ", 16, 1, num_channels, rate, byte_rate, block_align, bits_per_sample, b"data", data_size) return header + audio_data def parse_audio_mime_type(mime_type: str) -> dict[str, int]: bits, rate = 16, 24000 for param in mime_type.split(";"): param = param.strip() if param.lower().startswith("rate="): try: rate = int(param.split("=", 1)[1]) except: pass elif param.startswith("audio/L"): try: bits = int(param.split("L", 1)[1]) except: pass return {"bits_per_sample": bits, "rate": rate} def smart_text_split(text, max_size=3800): if len(text) <= max_size: return [text] chunks, current_chunk = [], "" sentences = re.split(r'(?<=[.!?؟])\s+', text) for sentence in sentences: if len(current_chunk) + len(sentence) + 1 > max_size: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence while len(current_chunk) > max_size: split_idx = next((i for i in range(max_size - 1, max_size // 2, -1) if current_chunk[i] in ['،', ',', ';', ':', ' ']), -1) part, current_chunk = (current_chunk[:split_idx+1], current_chunk[split_idx+1:]) if split_idx != -1 else (current_chunk[:max_size], current_chunk[max_size:]) chunks.append(part.strip()) else: current_chunk += (" " if current_chunk else "") + sentence if current_chunk: chunks.append(current_chunk.strip()) final_chunks = [c for c in chunks if c] return final_chunks def merge_audio_files_func(file_paths, output_path): if not PYDUB_AVAILABLE: logging.warning("⚠️ pydub برای ادغام در دسترس نیست."); return False try: combined = AudioSegment.empty() for i, fp in enumerate(file_paths): if os.path.exists(fp): combined += AudioSegment.from_file(fp) + (AudioSegment.silent(duration=150) if i < len(file_paths) - 1 else AudioSegment.empty()) else: logging.warning(f"⚠️ فایل برای ادغام پیدا نشد: {fp}") combined.export(output_path, format="wav") return True except Exception as e: logging.error(f"❌ خطا در ادغام فایل‌های صوتی: {e}"); return False # --- منطق Gemini Live --- async def generate_audio_live_with_retry(text, prompt, voice, session_id): MAX_RETRIES = 50 live_config = types.LiveConnectConfig( response_modalities=["AUDIO"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice) ) ), ) for attempt in range(MAX_RETRIES): selected_api_key, _ = get_random_api_key_and_client() if not selected_api_key: break client = genai.Client(http_options={"api_version": "v1beta"}, api_key=selected_api_key) unique_id_for_req = str(uuid.uuid4())[:8] tts_prompt = f"Please read the following text naturally: '{text}' [ID: {unique_id_for_req}]" if prompt: tts_prompt = f"With a {prompt} tone, please read: '{text}'" try: logging.info(f"[{session_id}] (Live) تلاش {attempt+1} با کلید ...{selected_api_key[-4:]}") audio_buffer = bytearray() async with client.aio.live.connect(model=FIXED_MODEL_NAME_LIVE, config=live_config) as session: await session.send(input=tts_prompt, end_of_turn=True) async for response in session.receive(): if response.data: audio_buffer.extend(response.data) if len(audio_buffer) > 0: logging.info(f"[{session_id}] ✅ (Live) موفقیت‌آمیز.") return audio_buffer else: raise Exception("بافر صوتی خالی بود.") except Exception as e: logging.warning(f"[{session_id}] ⚠️ (Live) خطا در تلاش {attempt+1}: {e}") time.sleep(0.5) return None def save_pcm_to_wav(pcm_data, output_path): try: with wave.open(output_path, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(24000) wf.writeframes(pcm_data) return True except Exception as e: logging.error(f"خطا در تبدیل PCM به WAV: {e}") return False # --- منطق Gemini Standard (اصلاح شده با retry_limit) --- def generate_audio_chunk_standard_with_retry(chunk_text, prompt_text, voice, temp, session_id, retry_limit): if not ALL_API_KEYS: raise Exception("هیچ کلید API در دسترس نیست.") # استفاده از محدودیت تعیین شده توسط Manager MAX_RETRIES = retry_limit for attempt in range(MAX_RETRIES): selected_api_key, client = get_random_api_key_and_client() if not client: break try: # logging.info(f"[{session_id}] (Standard) تلاش {attempt+1}/{MAX_RETRIES} با کلید ...{selected_api_key[-4:]}") final_text = f'{chunk_text}({prompt_text})' if prompt_text and prompt_text.strip() else chunk_text contents = [types.Content(role="user", parts=[types.Part.from_text(text=final_text)])] config = types.GenerateContentConfig(temperature=temp, response_modalities=["audio"], speech_config=types.SpeechConfig(voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice)))) response = client.models.generate_content(model=FIXED_MODEL_NAME_STANDARD, contents=contents, config=config) if response.candidates and response.candidates[0].content and response.candidates[0].content.parts and response.candidates[0].content.parts[0].inline_data: logging.info(f"[{session_id}] ✅ (Standard) موفقیت در تلاش {attempt+1}.") return response.candidates[0].content.parts[0].inline_data except Exception as e: logging.warning(f"[{session_id}] ⚠️ (Standard) خطا در تلاش {attempt+1}: {e}") time.sleep(0.5) return None def core_generate_audio(text_input, prompt_input, selected_voice, temperature_val, session_id, use_live_model=False, retry_limit=50, fallback_to_live=False): logging.info(f"[{session_id}] 🚀 شروع: Live={use_live_model}, Retry={retry_limit}, Fallback={fallback_to_live}") temp_dir = f"temp_{session_id}" os.makedirs(temp_dir, exist_ok=True) output_base_name = f"{temp_dir}/audio_session_{session_id}" final_output_path = f"output_{session_id}.wav" try: # 1. اگر دستور مستقیم استفاده از لایف باشد (مثلاً کاربر رایگان) if use_live_model: pcm_data = asyncio.run(generate_audio_live_with_retry(text_input, prompt_input, selected_voice, session_id)) if pcm_data and save_pcm_to_wav(pcm_data, final_output_path): return final_output_path else: raise Exception("تولید صدا با مدل لایف ناموفق بود.") # 2. استفاده از مدل استاندارد else: text_chunks = smart_text_split(text_input, DEFAULT_MAX_CHUNK_SIZE) generated_files = [] standard_failed = False for i, chunk in enumerate(text_chunks): # تلاش با مدل استاندارد به تعداد retry_limit inline_data = generate_audio_chunk_standard_with_retry(chunk, prompt_input, selected_voice, temperature_val, session_id, retry_limit) if inline_data: data_buffer = inline_data.data ext = mimetypes.guess_extension(inline_data.mime_type) or ".wav" if "audio/L" in inline_data.mime_type and ext == ".wav": data_buffer = convert_to_wav(data_buffer, inline_data.mime_type) if not ext.startswith("."): ext = "." + ext fpath = save_binary_file(f"{output_base_name}_part{i+1:03d}{ext}", data_buffer) if fpath: generated_files.append(fpath) else: standard_failed = True break # شکست در تولید یکی از چانک‌ها # 3. بررسی شکست و Fallback if standard_failed: if fallback_to_live: logging.info(f"[{session_id}] 🔄 مدل استاندارد شکست خورد. سوییچ به مدل لایف (Fallback)...") # پاکسازی فایل‌های ناقص قبلی generated_files = [] # فراخوانی مدل لایف برای کل متن pcm_data = asyncio.run(generate_audio_live_with_retry(text_input, prompt_input, selected_voice, session_id)) if pcm_data and save_pcm_to_wav(pcm_data, final_output_path): return final_output_path else: raise Exception("هم مدل استاندارد و هم مدل لایف (Fallback) شکست خوردند.") else: raise Exception(f"تولید صدا با مدل استاندارد پس از {retry_limit} تلاش ناموفق بود.") # اگر استاندارد موفق بود، فایل‌ها را ادغام کن if not generated_files: raise Exception("هیچ فایلی تولید نشد.") if len(generated_files) > 1: if PYDUB_AVAILABLE and merge_audio_files_func(generated_files, final_output_path): pass else: shutil.move(generated_files[0], final_output_path) else: shutil.move(generated_files[0], final_output_path) return final_output_path finally: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) _init_api_keys() app = FastAPI(title="Alpha TTS Worker API") class TTSRequest(BaseModel): text: str prompt: str | None = "" speaker: str temperature: float use_live_model: bool = False retry_limit: int = 50 # پارامتر جدید fallback_to_live: bool = False # پارامتر جدید @app.post("/generate") def generate_audio_endpoint(request: TTSRequest): session_id = str(uuid.uuid4())[:8] try: final_path = core_generate_audio( text_input=request.text, prompt_input=request.prompt, selected_voice=request.speaker, temperature_val=request.temperature, session_id=session_id, use_live_model=request.use_live_model, retry_limit=request.retry_limit, fallback_to_live=request.fallback_to_live ) if final_path and os.path.exists(final_path): from fastapi.responses import FileResponse return FileResponse(path=final_path, media_type='audio/wav', filename=os.path.basename(final_path), background=shutil.rmtree(os.path.dirname(final_path), ignore_errors=True)) else: raise HTTPException(status_code=500, detail="خطا در تولید فایل صوتی.") except Exception as e: logging.error(f"[{session_id}] ❌ خطا: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/") def health_check(): return {"status": "ok", "message": "TTS Worker is running."} if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port, reload=False)