""" Модуль для маппинга колонок загруженного датасета на колонки тренировочного датасета используя Llama через Together API """ import pandas as pd import os import re from together import Together def convert_coordinates_to_degrees(value): """ Конвертирует координаты из формата HMS/DMS в градусы Примеры: '07h29m25.85s' -> 112.357708 degrees '45d30m15.5s' -> 45.504306 degrees """ if pd.isna(value) or isinstance(value, (int, float)): return value value_str = str(value).strip() # Формат HMS (часы:минуты:секунды) для RA hms_match = re.match(r'(\d+)h(\d+)m([\d.]+)s?', value_str) if hms_match: hours = float(hms_match.group(1)) minutes = float(hms_match.group(2)) seconds = float(hms_match.group(3)) return hours * 15 + minutes * 0.25 + seconds * 0.00416667 # 1h = 15°, 1m = 0.25°, 1s = 0.00416667° # Формат DMS (градусы:минуты:секунды) для DEC dms_match = re.match(r'([+-]?)(\d+)d(\d+)m([\d.]+)s?', value_str) if dms_match: sign = -1 if dms_match.group(1) == '-' else 1 degrees = float(dms_match.group(2)) minutes = float(dms_match.group(3)) seconds = float(dms_match.group(4)) return sign * (degrees + minutes / 60 + seconds / 3600) # Если не распознали формат, возвращаем NaN return float('nan') class ColumnMapper: def __init__(self, api_key: str): """ Initialize column mapper Args: api_key: API key for Together AI """ self.client = Together(api_key=api_key) # Built-in synonym dictionary (fallback) - significantly expanded self.known_synonyms = { # Orbital period 'pl_orbper': 'koi_period', 'orbital_period': 'koi_period', 'period': 'koi_period', 'pl_orbpererr1': 'koi_period_err1', 'pl_orbpererr2': 'koi_period_err2', 'pl_orbpererr': 'koi_period_err1', # Transit time/epoch 'pl_tranmid': 'koi_time0bk', 'transit_time': 'koi_time0bk', 'time0': 'koi_time0bk', 'epoch': 'koi_time0bk', 'pl_tranmiderr1': 'koi_time0bk_err1', 'pl_tranmiderr2': 'koi_time0bk_err2', # Transit duration 'pl_trandur': 'koi_duration', 'pl_trandurh': 'koi_duration', 'transit_duration': 'koi_duration', 'duration': 'koi_duration', 'pl_trandurerr1': 'koi_duration_err1', 'pl_trandurerr2': 'koi_duration_err2', # Transit depth 'pl_trandep': 'koi_depth', 'transit_depth': 'koi_depth', 'depth': 'koi_depth', 'pl_trandeperr1': 'koi_depth_err1', 'pl_trandeperr2': 'koi_depth_err2', # Planet radius 'pl_rade': 'koi_prad', 'pl_radj': 'koi_prad', 'planet_radius': 'koi_prad', 'radius': 'koi_prad', 'pl_radeerr1': 'koi_prad_err1', 'pl_radeerr2': 'koi_prad_err2', 'pl_radjerr1': 'koi_prad_err1', 'pl_radjerr2': 'koi_prad_err2', # Insolation flux 'pl_insol': 'koi_insol', 'insolation': 'koi_insol', 'insol': 'koi_insol', 'pl_insolerr1': 'koi_insol_err1', 'pl_insolerr2': 'koi_insol_err2', # Equilibrium temperature 'pl_eqt': 'koi_teq', 'equilibrium_temp': 'koi_teq', 'teq': 'koi_teq', 'pl_eqterr1': 'koi_teq_err1', 'pl_eqterr2': 'koi_teq_err2', # Stellar effective temperature 'st_teff': 'koi_steff', 'stellar_teff': 'koi_steff', 'star_temp': 'koi_steff', 'teff': 'koi_steff', 'st_tefferr1': 'koi_steff_err1', 'st_tefferr2': 'koi_steff_err2', # Stellar surface gravity 'st_logg': 'koi_slogg', 'stellar_logg': 'koi_slogg', 'surface_gravity': 'koi_slogg', 'logg': 'koi_slogg', 'st_loggerr1': 'koi_slogg_err1', 'st_loggerr2': 'koi_slogg_err2', # Stellar radius 'st_rad': 'koi_srad', 'stellar_radius': 'koi_srad', 'star_radius': 'koi_srad', 'st_raderr1': 'koi_srad_err1', 'st_raderr2': 'koi_srad_err2', # Stellar mass 'st_mass': 'koi_smass', 'stellar_mass': 'koi_smass', 'st_masserr1': 'koi_smass_err1', 'st_masserr2': 'koi_smass_err2', # Kepler magnitude 'sy_kepmag': 'koi_kepmag', 'kepmag': 'koi_kepmag', 'kep_mag': 'koi_kepmag', 'sy_kepmaglim': 'koi_kepmag', # Coordinates 'ra': 'ra', 'ra_deg': 'ra', 'rastr': 'ra', 'dec': 'dec', 'dec_deg': 'dec', 'decstr': 'dec', # Model SNR 'koi_model_snr': 'koi_model_snr', 'snr': 'koi_model_snr', # Impact parameter 'pl_imppar': 'koi_impact', 'impact': 'koi_impact', 'impact_parameter': 'koi_impact', # Additional mappings for error columns 'koi_period_err': 'koi_period_err1', 'koi_time0bk_err': 'koi_time0bk_err1', 'koi_duration_err': 'koi_duration_err1', 'koi_depth_err': 'koi_depth_err1', 'koi_prad_err': 'koi_prad_err1', 'koi_teq_err': 'koi_teq_err1', 'koi_insol_err': 'koi_insol_err1', 'koi_steff_err': 'koi_steff_err1', 'koi_slogg_err': 'koi_slogg_err1', 'koi_srad_err': 'koi_srad_err1', 'koi_smass_err': 'koi_smass_err1', } def get_column_mapping(self, source_columns: list, target_columns: list) -> dict: """ Получает маппинг между колонками источника и целевыми колонками используя LLM Args: source_columns: Список колонок загруженного датасета target_columns: Список колонок тренировочного датасета Returns: Словарь маппинга {source_column: target_column} """ # Словарь известных синонимов для точного маппинга known_mappings = """ Common column name mappings (NASA Exoplanet Archive): - pl_orbper, orbital_period, period → koi_period (Orbital Period in days) - pl_tranmid, transit_time, time0 → koi_time0bk (Transit Epoch in BJD) - pl_trandur, pl_trandurh, transit_duration → koi_duration (Transit Duration in hours) - pl_trandep, transit_depth, depth → koi_depth (Transit Depth in ppm) - pl_rade, planet_radius, radius → koi_prad (Planetary Radius in Earth radii) - pl_insol, insolation, insol → koi_insol (Insolation Flux in Earth flux) - pl_eqt, equilibrium_temp, teq → koi_teq (Equilibrium Temperature in K) - st_teff, stellar_teff, star_temp → koi_steff (Stellar Effective Temperature in K) - st_logg, stellar_logg, surface_gravity → koi_slogg (Stellar Surface Gravity in log10(cm/s^2)) - st_rad, stellar_radius, star_radius → koi_srad (Stellar Radius in Solar radii) - st_mass, stellar_mass, star_mass → koi_smass (Stellar Mass in Solar masses) - ra, ra_deg → ra (Right Ascension in degrees) - dec, dec_deg → dec (Declination in degrees) - pl_bmassj, planet_mass → koi_prad (use radius if mass not available) - sy_dist, distance → koi_steff (stellar distance - related to stellar properties) """ prompt = f"""You are an expert in NASA Exoplanet Archive data mapping. Map column names from a source dataset to Kepler/KOI target dataset columns. {known_mappings} Source columns: {source_columns} Target columns: {target_columns} CRITICAL INSTRUCTIONS: 1. Use the known mappings above as your PRIMARY reference 2. Match columns based on physical meaning (e.g., "pl_orbper" = orbital period = "koi_period") 3. Common prefixes: "pl_" = planet property, "st_" = stellar property, "koi_" = KOI property 4. If exact match exists in known mappings, USE IT 5. Only map columns with clear semantic similarity 6. Return ONLY a Python dictionary: {{"source": "target", ...}} 7. NO markdown, NO explanations, NO code blocks - just the dictionary Example: {{"pl_orbper": "koi_period", "st_teff": "koi_steff", "ra": "ra"}} Mapping:""" response = self.client.chat.completions.create( model="meta-llama/Llama-3.3-70B-Instruct-Turbo", messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=2000 ) mapping_str = response.choices[0].message.content.strip() # Очистка ответа от возможных markdown блоков if "```" in mapping_str: mapping_str = mapping_str.split("```")[1] if mapping_str.startswith("python"): mapping_str = mapping_str[6:] mapping_str = mapping_str.strip() # Преобразование строки в словарь try: mapping = eval(mapping_str) if not isinstance(mapping, dict): raise ValueError("Response is not a dictionary") except Exception as e: print(f"Error parsing mapping: {e}") print(f"Raw response: {mapping_str}") # Возвращаем пустой маппинг в случае ошибки mapping = {} # Supplement mapping with known synonyms (fallback) # Check source columns that were not mapped by Llama unmapped_sources = [col for col in source_columns if col not in mapping] for src_col in unmapped_sources: src_lower = src_col.lower() # Check exact match with known synonyms if src_lower in self.known_synonyms: target = self.known_synonyms[src_lower] if target in target_columns: mapping[src_col] = target continue # Check for partial matches (more sophisticated) # Remove common prefixes/suffixes for comparison src_clean = src_lower.replace('pl_', '').replace('st_', '').replace('sy_', '').replace('koi_', '') for known_src, known_tgt in self.known_synonyms.items(): known_clean = known_src.replace('pl_', '').replace('st_', '').replace('sy_', '').replace('koi_', '') # Check if core part matches if src_clean == known_clean or known_clean in src_clean or src_clean in known_clean: if known_tgt in target_columns: mapping[src_col] = known_tgt break # If still not mapped, try fuzzy matching on target columns if src_col not in mapping: for tgt_col in target_columns: tgt_clean = tgt_col.replace('koi_', '') # Check if source contains target name if tgt_clean in src_lower or src_clean == tgt_clean: mapping[src_col] = tgt_col break return mapping def apply_mapping(self, df: pd.DataFrame, mapping: dict) -> pd.DataFrame: """ Применяет маппинг к датафрейму Args: df: Исходный датафрейм mapping: Словарь маппинга Returns: Датафрейм с переименованными колонками """ # Переименовываем только те колонки, которые есть в маппинге df_mapped = df.copy() # Проверяем какие колонки из маппинга действительно есть в датафрейме valid_mapping = {k: v for k, v in mapping.items() if k in df.columns} if valid_mapping: df_mapped = df_mapped.rename(columns=valid_mapping) # Ensure all columns are properly flattened and converted to numeric where possible for col in df_mapped.columns: try: # Get the column as a Series col_data = df_mapped[col] # Check if it's actually a Series (not a DataFrame) if not isinstance(col_data, pd.Series): continue # Check if column has object dtype or might contain complex data if col_data.dtype == 'object': try: # Try to convert to numeric df_mapped[col] = pd.to_numeric(col_data, errors='coerce') except: pass # Ensure column is 1D if hasattr(col_data, 'values'): col_values = col_data.values if len(col_values.shape) > 1: # Flatten multi-dimensional arrays df_mapped[col] = col_values.flatten()[:len(df_mapped)] except Exception as e: # Skip problematic columns continue return df_mapped def map_dataset(self, uploaded_df: pd.DataFrame, target_columns: list) -> tuple: """ Полный процесс маппинга датасета Args: uploaded_df: Загруженный датафрейм target_columns: Список колонок тренировочного датасета Returns: Кортеж (mapped_dataframe, mapping_dict, info_message) """ # Копируем датафрейм чтобы не изменять оригинал df_work = uploaded_df.copy() # Clean up column names - remove extra spaces, special characters df_work.columns = df_work.columns.str.strip() # Handle any multi-dimensional columns before mapping for col in df_work.columns: if df_work[col].dtype == 'object': # Check if column contains complex structures first_val = df_work[col].dropna().iloc[0] if len(df_work[col].dropna()) > 0 else None if isinstance(first_val, (list, tuple)): # Flatten lists/tuples - take first element df_work[col] = df_work[col].apply( lambda x: x[0] if isinstance(x, (list, tuple)) and len(x) > 0 else (x if not isinstance(x, (list, tuple)) else None) ) elif isinstance(first_val, str): # Try to convert string representations of numbers try: df_work[col] = pd.to_numeric(df_work[col], errors='ignore') except: pass # Конвертируем координаты в градусы если они в текстовом формате coord_columns = [col for col in df_work.columns if any( keyword in col.lower() for keyword in ['ra', 'dec', 'coord', 'right_ascension', 'declination', 'rastr', 'decstr'] )] for col in coord_columns: # Check first non-empty value first_val = df_work[col].dropna().iloc[0] if len(df_work[col].dropna()) > 0 else None if first_val and isinstance(first_val, str) and ('h' in first_val or 'd' in first_val): # Convert entire column df_work[col] = df_work[col].apply(convert_coordinates_to_degrees) source_columns = df_work.columns.tolist() # Get mapping via LLM mapping = self.get_column_mapping(source_columns, target_columns) # Apply mapping mapped_df = self.apply_mapping(df_work, mapping) # Create info message if mapping: info_msg = f"Successfully mapped {len(mapping)} columns:\n" for src, tgt in mapping.items(): info_msg += f" * {src} -> {tgt}\n" else: info_msg = "Warning: No mapping performed - no matches found between columns\n" info_msg += f"Source columns: {', '.join(source_columns[:5])}...\n" # Check which target columns are missing missing_cols = set(target_columns) - set(mapped_df.columns) if missing_cols: info_msg += f"\nWarning: Missing {len(missing_cols)} target columns (will be filled with zeros)\n" return mapped_df, mapping, info_msg def load_training_columns(csv_path: str) -> list: """ Load column names from training dataset Args: csv_path: Path to training dataset CSV file Returns: List of column names """ df = pd.read_csv(csv_path, comment='#', nrows=1) return df.columns.tolist()