Spaces:
Sleeping
Sleeping
| """ | |
| Gradio UI для предсказания экзопланет | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import joblib | |
| import os | |
| import time | |
| from mapping import ColumnMapper, load_training_columns | |
| from dotenv import load_dotenv | |
| # Загружаем переменные окружения из .env файла | |
| load_dotenv() | |
| # Константы | |
| TRAINING_CSV_PATH = "cumulative_2025.10.03_08.34.41.csv" | |
| MODEL_PATH = "exoplanet_detector.joblib" | |
| TOGETHER_API_KEY = os.getenv("TOGETHER_API_KEY", "") | |
| # Загружаем модель и колонки тренировочного датасета | |
| model = joblib.load(MODEL_PATH) | |
| training_columns = load_training_columns(TRAINING_CSV_PATH) | |
| # Инициализируем маппер | |
| mapper = ColumnMapper(api_key=TOGETHER_API_KEY) | |
| def predict_exoplanets(uploaded_file): | |
| """ | |
| Process uploaded file and return predictions | |
| Args: | |
| uploaded_file: Uploaded CSV file | |
| Returns: | |
| Tuple (results, mapping info, statistics) | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Load dataset | |
| if uploaded_file is None: | |
| return None, "Error: Please upload a CSV file", None | |
| # Read uploaded file with robust error handling | |
| try: | |
| df_uploaded = pd.read_csv(uploaded_file.name, comment='#', low_memory=False) | |
| except Exception as e: | |
| try: | |
| # Try with different encoding | |
| df_uploaded = pd.read_csv(uploaded_file.name, comment='#', encoding='latin1', low_memory=False) | |
| except Exception as e2: | |
| return None, f"Error reading CSV file: {str(e)}\nAlternative attempt: {str(e2)}", None | |
| # Ensure all columns are properly formatted (no multi-dimensional data) | |
| for col in df_uploaded.columns: | |
| # Check if column contains lists or arrays | |
| if df_uploaded[col].dtype == 'object': | |
| first_val = df_uploaded[col].dropna().iloc[0] if len(df_uploaded[col].dropna()) > 0 else None | |
| if isinstance(first_val, (list, tuple)): | |
| # Flatten lists/tuples - take first element | |
| df_uploaded[col] = df_uploaded[col].apply(lambda x: x[0] if isinstance(x, (list, tuple)) and len(x) > 0 else x) | |
| info_msg = f"Loaded rows: {len(df_uploaded)}\n" | |
| info_msg += f"Columns in uploaded dataset: {len(df_uploaded.columns)}\n\n" | |
| # Apply column mapping | |
| mapping_start = time.time() | |
| info_msg += "Performing column mapping via Llama...\n\n" | |
| df_mapped, mapping, mapping_info = mapper.map_dataset(df_uploaded, training_columns) | |
| mapping_time = time.time() - mapping_start | |
| info_msg += mapping_info + "\n" | |
| info_msg += f"Mapping time: {mapping_time:.2f} sec\n\n" | |
| # Get features expected by the model | |
| try: | |
| expected_features = list(model.feature_names_in_) | |
| info_msg += f"Model expects {len(expected_features)} features\n\n" | |
| except AttributeError: | |
| # If feature_names_in_ is not available, use all columns except targets | |
| target_cols = ['koi_disposition', 'koi_pdisposition'] | |
| expected_features = [col for col in training_columns if col not in target_cols] | |
| info_msg += f"Using {len(expected_features)} features from training dataset\n\n" | |
| # Prepare X with correct columns | |
| info_msg += f"Creating DataFrame with {len(expected_features)} columns...\n" | |
| # Create empty DataFrame with correct columns filled with zeros | |
| X = pd.DataFrame(0.0, index=df_mapped.index, columns=expected_features) | |
| # Fill columns that exist in df_mapped | |
| for col in expected_features: | |
| if col in df_mapped.columns: | |
| try: | |
| # Convert to numeric, handling any data type | |
| col_data = pd.to_numeric(df_mapped[col], errors='coerce') | |
| # Ensure we have a 1D Series, flatten if needed | |
| if hasattr(col_data, 'values'): | |
| col_values = col_data.values | |
| if len(col_values.shape) > 1: | |
| info_msg += f"Warning: Column '{col}' has shape {col_values.shape}, flattening...\n" | |
| col_values = col_values.flatten()[:len(X)] # Take only first N values | |
| X[col] = col_values | |
| else: | |
| X[col] = col_data | |
| except Exception as e: | |
| info_msg += f"Warning: Could not convert column '{col}': {str(e)}\n" | |
| X[col] = 0.0 | |
| # Ensure all columns are numeric | |
| X = X.apply(pd.to_numeric, errors='coerce') | |
| # Calculate statistics | |
| available_cols = [col for col in expected_features if col in df_mapped.columns] | |
| missing_cols = [col for col in expected_features if col not in df_mapped.columns] | |
| if missing_cols: | |
| info_msg += f"Warning: {len(missing_cols)} columns missing (filled with zeros)\n" | |
| info_msg += f"DEBUG: X.shape = {X.shape}, expected: ({len(df_mapped)}, {len(expected_features)})\n" | |
| # Fill NaN with column means, then with 0 for any remaining NaN | |
| X = X.fillna(X.mean()) | |
| X = X.fillna(0) | |
| # Ensure no infinite values | |
| X = X.replace([float('inf'), float('-inf')], 0) | |
| info_msg += f"DEBUG: After fillna X.shape = {X.shape}\n" | |
| info_msg += f"Data processing: {X.shape}\n" | |
| info_msg += f" Filled: {len(available_cols)} columns, Added zeros: {len(missing_cols)}\n" | |
| info_msg += f"Data prepared for model\n\n" | |
| # Make predictions | |
| pred_start = time.time() | |
| # Use numpy array instead of DataFrame to bypass feature name checks | |
| X_values = X.values # Convert to numpy array | |
| info_msg += f"DEBUG: X_values.shape = {X_values.shape}\n\n" | |
| predictions = model.predict(X_values) | |
| predictions_proba = model.predict_proba(X_values) | |
| pred_time = time.time() - pred_start | |
| info_msg += f"Predictions completed: {len(predictions)} objects in {pred_time:.2f} sec\n" | |
| # Create result dataframe | |
| df_result = df_uploaded.copy() | |
| # Get unique classes from model | |
| classes = model.classes_ | |
| info_msg += f" Found classes: {list(classes)}\n\n" | |
| # Add predictions (text labels) | |
| df_result['prediction'] = predictions | |
| # Add probabilities for each class | |
| for i, class_name in enumerate(classes): | |
| df_result[f'confidence_{class_name.replace(" ", "_").lower()}'] = predictions_proba[:, i] | |
| # Add mapping information as separate columns | |
| if mapping: | |
| for src_col, tgt_col in mapping.items(): | |
| if src_col in df_uploaded.columns and tgt_col in df_mapped.columns: | |
| df_result[f'mapped_as_{tgt_col}'] = df_uploaded[src_col] | |
| # Создаем упрощенный вывод с только важными колонками для отображения | |
| # Выбираем колонки предсказаний | |
| display_columns = ['prediction'] | |
| for class_name in classes: | |
| col_name = f'confidence_{class_name.replace(" ", "_").lower()}' | |
| if col_name in df_result.columns: | |
| display_columns.append(col_name) | |
| # Add mapped columns (if any) | |
| mapped_cols = [col for col in df_result.columns if col.startswith('mapped_as_')] | |
| display_columns.extend(mapped_cols[:10]) # Show first 10 mapped columns | |
| # If no mapped columns, add first 5 original columns | |
| if not mapped_cols and len(df_uploaded.columns) > 0: | |
| original_cols = [col for col in df_uploaded.columns[:5] if col in df_result.columns] | |
| display_columns.extend(original_cols) | |
| # Create dataframe for display | |
| df_display = df_result[display_columns].copy() | |
| total_time = time.time() - start_time | |
| # Create statistics by class | |
| from collections import Counter | |
| pred_counts = Counter(predictions) | |
| stats_lines = ["**Prediction Statistics:**\n"] | |
| stats_lines.append(f"* Total objects: {len(predictions)}\n") | |
| for class_name in classes: | |
| count = pred_counts.get(class_name, 0) | |
| pct = count / len(predictions) * 100 if len(predictions) > 0 else 0 | |
| stats_lines.append(f"* {class_name}: {count} ({pct:.1f}%)\n") | |
| stats_lines.append(f"\n**Processing time:** {total_time:.2f} seconds\n") | |
| stats_lines.append(f"\n**Columns in result:**\n") | |
| stats_lines.append(f"* All original columns from uploaded file (with original names)\n") | |
| stats_lines.append(f"* `prediction`: Predicted class ({', '.join(classes)})\n") | |
| for class_name in classes: | |
| col_name = f'confidence_{class_name.replace(" ", "_").lower()}' | |
| stats_lines.append(f"* `{col_name}`: Probability of class {class_name}\n") | |
| stats_lines.append(f"* Columns `mapped_as_*`: Duplicate mapped columns for reference\n") | |
| stats_lines.append(f"\n**Total columns in result:** {len(df_result.columns)}\n") | |
| stats = "".join(stats_lines) + f""" | |
| **Mapping completed:** {len(mapping)} columns renamed for model | |
| **Full dataset saved:** All {len(df_result.columns)} columns available for download | |
| """ | |
| # Save full result to temporary file for download | |
| output_file = "predictions_result.csv" | |
| df_result.to_csv(output_file, index=False) | |
| # Return simplified output for display and path to full file | |
| return df_display, info_msg, stats, output_file | |
| except Exception as e: | |
| error_msg = f"Error processing file:\n{str(e)}" | |
| import traceback | |
| error_msg += f"\n\n{traceback.format_exc()}" | |
| return None, error_msg, None, None | |
| # Create Gradio interface | |
| with gr.Blocks(title="Exoplanet Detector", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # Exoplanet Detector | |
| Upload a CSV file with data about exoplanet candidates (KOI - Kepler Objects of Interest). | |
| **How it works:** | |
| 1. Upload your dataset with any column structure | |
| 2. Llama automatically maps your columns to training columns | |
| 3. Model makes predictions: exoplanet or false positive | |
| **Model:** Random Forest Classifier | |
| **Mapping:** Llama 3.3 70B via Together AI | |
| **Note:** Processing large datasets (>1000 rows) may take several minutes. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="Upload CSV file", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| submit_btn = gr.Button("Run Prediction", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| mapping_info = gr.Textbox( | |
| label="Column Mapping Information", | |
| lines=15, | |
| max_lines=20 | |
| ) | |
| with gr.Row(): | |
| stats_output = gr.Markdown(label="Statistics") | |
| with gr.Row(): | |
| results_output = gr.Dataframe( | |
| label="Prediction Results (main columns)", | |
| wrap=True, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| download_output = gr.File( | |
| label="Download full result with all columns", | |
| interactive=False | |
| ) | |
| # Event handler | |
| submit_btn.click( | |
| fn=predict_exoplanets, | |
| inputs=[file_input], | |
| outputs=[results_output, mapping_info, stats_output, download_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### Tips: | |
| - Make sure your CSV file contains data about stellar systems and their characteristics | |
| - The more columns match the training dataset, the more accurate the predictions will be | |
| - Model trained on NASA Exoplanet Archive data (Kepler Mission) | |
| ### Example training dataset columns: | |
| `koi_period`, `koi_depth`, `koi_prad`, `koi_teq`, `koi_insol`, `koi_steff`, `koi_slogg`, `koi_srad`, `ra`, `dec`, `koi_kepmag` etc. | |
| """) | |
| # Launch application | |
| if __name__ == "__main__": | |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) | |