""" Gradio UI для предсказания экзопланет """ import gradio as gr import pandas as pd import joblib import os import time from mapping import ColumnMapper, load_training_columns from dotenv import load_dotenv # Загружаем переменные окружения из .env файла load_dotenv() # Константы TRAINING_CSV_PATH = "cumulative_2025.10.03_08.34.41.csv" MODEL_PATH = "exoplanet_detector.joblib" TOGETHER_API_KEY = os.getenv("TOGETHER_API_KEY", "") # Загружаем модель и колонки тренировочного датасета model = joblib.load(MODEL_PATH) training_columns = load_training_columns(TRAINING_CSV_PATH) # Инициализируем маппер mapper = ColumnMapper(api_key=TOGETHER_API_KEY) def predict_exoplanets(uploaded_file): """ Process uploaded file and return predictions Args: uploaded_file: Uploaded CSV file Returns: Tuple (results, mapping info, statistics) """ start_time = time.time() try: # Load dataset if uploaded_file is None: return None, "Error: Please upload a CSV file", None # Read uploaded file with robust error handling try: df_uploaded = pd.read_csv(uploaded_file.name, comment='#', low_memory=False) except Exception as e: try: # Try with different encoding df_uploaded = pd.read_csv(uploaded_file.name, comment='#', encoding='latin1', low_memory=False) except Exception as e2: return None, f"Error reading CSV file: {str(e)}\nAlternative attempt: {str(e2)}", None # Ensure all columns are properly formatted (no multi-dimensional data) for col in df_uploaded.columns: # Check if column contains lists or arrays if df_uploaded[col].dtype == 'object': first_val = df_uploaded[col].dropna().iloc[0] if len(df_uploaded[col].dropna()) > 0 else None if isinstance(first_val, (list, tuple)): # Flatten lists/tuples - take first element df_uploaded[col] = df_uploaded[col].apply(lambda x: x[0] if isinstance(x, (list, tuple)) and len(x) > 0 else x) info_msg = f"Loaded rows: {len(df_uploaded)}\n" info_msg += f"Columns in uploaded dataset: {len(df_uploaded.columns)}\n\n" # Apply column mapping mapping_start = time.time() info_msg += "Performing column mapping via Llama...\n\n" df_mapped, mapping, mapping_info = mapper.map_dataset(df_uploaded, training_columns) mapping_time = time.time() - mapping_start info_msg += mapping_info + "\n" info_msg += f"Mapping time: {mapping_time:.2f} sec\n\n" # Get features expected by the model try: expected_features = list(model.feature_names_in_) info_msg += f"Model expects {len(expected_features)} features\n\n" except AttributeError: # If feature_names_in_ is not available, use all columns except targets target_cols = ['koi_disposition', 'koi_pdisposition'] expected_features = [col for col in training_columns if col not in target_cols] info_msg += f"Using {len(expected_features)} features from training dataset\n\n" # Prepare X with correct columns info_msg += f"Creating DataFrame with {len(expected_features)} columns...\n" # Create empty DataFrame with correct columns filled with zeros X = pd.DataFrame(0.0, index=df_mapped.index, columns=expected_features) # Fill columns that exist in df_mapped for col in expected_features: if col in df_mapped.columns: try: # Convert to numeric, handling any data type col_data = pd.to_numeric(df_mapped[col], errors='coerce') # Ensure we have a 1D Series, flatten if needed if hasattr(col_data, 'values'): col_values = col_data.values if len(col_values.shape) > 1: info_msg += f"Warning: Column '{col}' has shape {col_values.shape}, flattening...\n" col_values = col_values.flatten()[:len(X)] # Take only first N values X[col] = col_values else: X[col] = col_data except Exception as e: info_msg += f"Warning: Could not convert column '{col}': {str(e)}\n" X[col] = 0.0 # Ensure all columns are numeric X = X.apply(pd.to_numeric, errors='coerce') # Calculate statistics available_cols = [col for col in expected_features if col in df_mapped.columns] missing_cols = [col for col in expected_features if col not in df_mapped.columns] if missing_cols: info_msg += f"Warning: {len(missing_cols)} columns missing (filled with zeros)\n" info_msg += f"DEBUG: X.shape = {X.shape}, expected: ({len(df_mapped)}, {len(expected_features)})\n" # Fill NaN with column means, then with 0 for any remaining NaN X = X.fillna(X.mean()) X = X.fillna(0) # Ensure no infinite values X = X.replace([float('inf'), float('-inf')], 0) info_msg += f"DEBUG: After fillna X.shape = {X.shape}\n" info_msg += f"Data processing: {X.shape}\n" info_msg += f" Filled: {len(available_cols)} columns, Added zeros: {len(missing_cols)}\n" info_msg += f"Data prepared for model\n\n" # Make predictions pred_start = time.time() # Use numpy array instead of DataFrame to bypass feature name checks X_values = X.values # Convert to numpy array info_msg += f"DEBUG: X_values.shape = {X_values.shape}\n\n" predictions = model.predict(X_values) predictions_proba = model.predict_proba(X_values) pred_time = time.time() - pred_start info_msg += f"Predictions completed: {len(predictions)} objects in {pred_time:.2f} sec\n" # Create result dataframe df_result = df_uploaded.copy() # Get unique classes from model classes = model.classes_ info_msg += f" Found classes: {list(classes)}\n\n" # Add predictions (text labels) df_result['prediction'] = predictions # Add probabilities for each class for i, class_name in enumerate(classes): df_result[f'confidence_{class_name.replace(" ", "_").lower()}'] = predictions_proba[:, i] # Add mapping information as separate columns if mapping: for src_col, tgt_col in mapping.items(): if src_col in df_uploaded.columns and tgt_col in df_mapped.columns: df_result[f'mapped_as_{tgt_col}'] = df_uploaded[src_col] # Создаем упрощенный вывод с только важными колонками для отображения # Выбираем колонки предсказаний display_columns = ['prediction'] for class_name in classes: col_name = f'confidence_{class_name.replace(" ", "_").lower()}' if col_name in df_result.columns: display_columns.append(col_name) # Add mapped columns (if any) mapped_cols = [col for col in df_result.columns if col.startswith('mapped_as_')] display_columns.extend(mapped_cols[:10]) # Show first 10 mapped columns # If no mapped columns, add first 5 original columns if not mapped_cols and len(df_uploaded.columns) > 0: original_cols = [col for col in df_uploaded.columns[:5] if col in df_result.columns] display_columns.extend(original_cols) # Create dataframe for display df_display = df_result[display_columns].copy() total_time = time.time() - start_time # Create statistics by class from collections import Counter pred_counts = Counter(predictions) stats_lines = ["**Prediction Statistics:**\n"] stats_lines.append(f"* Total objects: {len(predictions)}\n") for class_name in classes: count = pred_counts.get(class_name, 0) pct = count / len(predictions) * 100 if len(predictions) > 0 else 0 stats_lines.append(f"* {class_name}: {count} ({pct:.1f}%)\n") stats_lines.append(f"\n**Processing time:** {total_time:.2f} seconds\n") stats_lines.append(f"\n**Columns in result:**\n") stats_lines.append(f"* All original columns from uploaded file (with original names)\n") stats_lines.append(f"* `prediction`: Predicted class ({', '.join(classes)})\n") for class_name in classes: col_name = f'confidence_{class_name.replace(" ", "_").lower()}' stats_lines.append(f"* `{col_name}`: Probability of class {class_name}\n") stats_lines.append(f"* Columns `mapped_as_*`: Duplicate mapped columns for reference\n") stats_lines.append(f"\n**Total columns in result:** {len(df_result.columns)}\n") stats = "".join(stats_lines) + f""" **Mapping completed:** {len(mapping)} columns renamed for model **Full dataset saved:** All {len(df_result.columns)} columns available for download """ # Save full result to temporary file for download output_file = "predictions_result.csv" df_result.to_csv(output_file, index=False) # Return simplified output for display and path to full file return df_display, info_msg, stats, output_file except Exception as e: error_msg = f"Error processing file:\n{str(e)}" import traceback error_msg += f"\n\n{traceback.format_exc()}" return None, error_msg, None, None # Create Gradio interface with gr.Blocks(title="Exoplanet Detector", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # Exoplanet Detector Upload a CSV file with data about exoplanet candidates (KOI - Kepler Objects of Interest). **How it works:** 1. Upload your dataset with any column structure 2. Llama automatically maps your columns to training columns 3. Model makes predictions: exoplanet or false positive **Model:** Random Forest Classifier **Mapping:** Llama 3.3 70B via Together AI **Note:** Processing large datasets (>1000 rows) may take several minutes. """) with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload CSV file", file_types=[".csv"], type="filepath" ) submit_btn = gr.Button("Run Prediction", variant="primary", size="lg") with gr.Column(scale=2): mapping_info = gr.Textbox( label="Column Mapping Information", lines=15, max_lines=20 ) with gr.Row(): stats_output = gr.Markdown(label="Statistics") with gr.Row(): results_output = gr.Dataframe( label="Prediction Results (main columns)", wrap=True, interactive=False ) with gr.Row(): download_output = gr.File( label="Download full result with all columns", interactive=False ) # Event handler submit_btn.click( fn=predict_exoplanets, inputs=[file_input], outputs=[results_output, mapping_info, stats_output, download_output] ) gr.Markdown(""" --- ### Tips: - Make sure your CSV file contains data about stellar systems and their characteristics - The more columns match the training dataset, the more accurate the predictions will be - Model trained on NASA Exoplanet Archive data (Kepler Mission) ### Example training dataset columns: `koi_period`, `koi_depth`, `koi_prad`, `koi_teq`, `koi_insol`, `koi_steff`, `koi_slogg`, `koi_srad`, `ra`, `dec`, `koi_kepmag` etc. """) # Launch application if __name__ == "__main__": demo.launch(share=False, server_name="0.0.0.0", server_port=7860)