Spaces:
Sleeping
Sleeping
| import sys | |
| sys.path.append('../core') | |
| import os | |
| import shutil | |
| from datetime import datetime | |
| import json | |
| import re | |
| # Assuming these imports are correctly set up in your project structure | |
| from llm.llm import LLM | |
| # from prompt.constants import modeling_methods # This was unused in the original code | |
| from input.problem import problem_input | |
| from agent.problem_analysis import ProblemAnalysis | |
| # from agent.method_ranking import MethodRanking | |
| from agent.problem_modeling import ProblemModeling | |
| from agent.task_decompse import TaskDecompose | |
| from agent.task import Task | |
| from agent.create_charts import Chart | |
| from agent.coordinator import Coordinator | |
| from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown | |
| from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT | |
| from utils.generate_paper import generate_paper_from_json | |
| # from utils.convert_format import markdown_to_latex # Uncomment if needed | |
| from prompt.constants import modeling_methods | |
| def mkdir_output(path): | |
| """Creates the necessary output directories.""" | |
| os.makedirs(path, exist_ok=True) | |
| os.makedirs(os.path.join(path, 'json'), exist_ok=True) | |
| os.makedirs(os.path.join(path, 'markdown'), exist_ok=True) | |
| os.makedirs(os.path.join(path, 'latex'), exist_ok=True) # Assuming latex might be used later | |
| os.makedirs(os.path.join(path, 'code'), exist_ok=True) | |
| os.makedirs(os.path.join(path, 'usage'), exist_ok=True) | |
| os.makedirs(os.path.join(path, 'intermediate'), exist_ok=True) # For intermediate coordinator state if needed | |
| class ModelingAgentSystem: | |
| """ | |
| Manages the step-by-step generation of a mathematical modeling report. | |
| Allows for granular control over section generation and tracks progress. | |
| """ | |
| def __init__(self, problem_path: str, config: dict, dataset_path: str, output_path: str, name: str): | |
| """ | |
| Initializes the Modeling Agent System. | |
| Args: | |
| problem_path: Path to the problem description file (e.g., JSON). | |
| config: Dictionary containing configuration parameters (model_name, rounds, etc.). | |
| dataset_path: Path to the dataset directory associated with the problem. | |
| output_path: Path where generated outputs (json, md, code, etc.) will be saved. | |
| name: A unique name for this run/problem (used in filenames). | |
| """ | |
| self.problem_path = problem_path | |
| self.config = config | |
| self.dataset_path = dataset_path | |
| self.output_path = output_path | |
| self.name = name | |
| # --- Essential State --- | |
| self.paper = {'tasks': []} # Holds the final generated content | |
| self.completed_steps = set() # Tracks completed step names | |
| self.planned_steps = [] # Dynamically updated list of step names | |
| self.dependencies = self._define_dependencies() # Map of step -> prerequisites | |
| # --- LLM & Agents --- | |
| self.llm = LLM(config['model_name']) | |
| self.pa = ProblemAnalysis(self.llm) | |
| # self.mr = MethodRanking(self.llm) | |
| self.pm = ProblemModeling(self.llm) | |
| self.td = TaskDecompose(self.llm) | |
| self.task = Task(self.llm) | |
| self.chart = Chart(self.llm) | |
| self.coordinator = Coordinator(self.llm) # Manages task dependencies and intermediate results | |
| # --- Intermediate Data (Populated during generation) --- | |
| self.problem_str: str | None = None | |
| self.problem: dict | None = None | |
| self.problem_type: str | None = None | |
| self.problem_year: str | None = None | |
| self.problem_analysis: str | None = None | |
| self.modeling_solution: str | None = None | |
| self.task_descriptions: list[str] | None = None | |
| self.order: list[int] | None = None # Execution order of tasks | |
| self.with_code: bool = False | |
| # --- Setup --- | |
| mkdir_output(self.output_path) | |
| self._initialize_problem_and_steps() | |
| print(f"Initialization complete. Starting steps: {self.planned_steps}") | |
| print(f"Already completed: {self.completed_steps}") | |
| def _define_dependencies(self): | |
| """Defines the prerequisite steps for each generation step.""" | |
| # Basic structure, will be expanded after task decomposition | |
| deps = { | |
| 'Problem Background': [], | |
| 'Problem Requirement': [], | |
| 'Problem Analysis': ['Problem Background', 'Problem Requirement'], | |
| 'High-Level Modeling': ['Problem Analysis'], | |
| 'Task Decomposition': ['High-Level Modeling'], | |
| 'Dependency Analysis': ['Task Decomposition'], # Added explicit dependency analysis step | |
| # Task dependencies will be added dynamically | |
| } | |
| return deps | |
| def _update_dependencies_after_decomp(self): | |
| """Updates dependencies for task-specific steps after decomposition and dependency analysis.""" | |
| if not self.order: | |
| print("Warning: Task order not determined. Cannot update task dependencies.") | |
| return | |
| num_tasks = len(self.task_descriptions) | |
| for i in range(1, num_tasks + 1): | |
| task_id = str(i) | |
| task_prereqs = [f'Task {dep_id} Subtask Outcome Analysis' for dep_id in self.coordinator.DAG.get(task_id, [])] | |
| # Add 'Dependency Analysis' as a prerequisite for the *first* step of *any* task | |
| base_task_prereqs = ['Dependency Analysis'] + task_prereqs | |
| self.dependencies[f'Task {i} Description'] = ['Task Decomposition'] # Description comes directly from decomp | |
| self.dependencies[f'Task {i} Analysis'] = [f'Task {i} Description'] + base_task_prereqs | |
| self.dependencies[f'Task {i} Preliminary Formulas'] = [f'Task {i} Analysis'] | |
| self.dependencies[f'Task {i} Mathematical Modeling Process'] = [f'Task {i} Preliminary Formulas'] | |
| if self.with_code: | |
| self.dependencies[f'Task {i} Code'] = [f'Task {i} Mathematical Modeling Process'] | |
| self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Code'] | |
| else: | |
| # If no code, interpretation depends directly on modeling | |
| self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Mathematical Modeling Process'] | |
| self.dependencies[f'Task {i} Subtask Outcome Analysis'] = [f'Task {i} Solution Interpretation'] | |
| self.dependencies[f'Task {i} Charts'] = [f'Task {i} Subtask Outcome Analysis'] | |
| def _initialize_problem_and_steps(self): | |
| """Loads the problem input and sets up the initial state.""" | |
| print("Loading problem input...") | |
| self.problem_str, self.problem = problem_input(self.problem_path, self.llm) | |
| filename = os.path.splitext(os.path.basename(self.problem_path))[0] | |
| if '_' in filename: | |
| self.problem_year, self.problem_type = filename.split('_')[:2] | |
| else: | |
| self.problem_type = 'X' | |
| self.problem_year = 'XXXX' | |
| self.paper['problem_background'] = self.problem['background'] | |
| self.paper['problem_requirement'] = self.problem['problem_requirement'] | |
| self.completed_steps.add('Problem Background') | |
| self.completed_steps.add('Problem Requirement') | |
| self.with_code = len(self.problem.get('dataset_path', '')) > 0 or len(self.dataset_path) > 0 # Check both problem spec and explicit path | |
| if self.with_code and os.path.exists(self.dataset_path): | |
| print(f"Copying dataset from {self.dataset_path} to {os.path.join(self.output_path, 'code')}") | |
| shutil.copytree(self.dataset_path, os.path.join(self.output_path, 'code'), dirs_exist_ok=True) | |
| elif self.with_code: | |
| print(f"Warning: Code execution expected, but dataset path '{self.dataset_path}' not found.") | |
| # Initial plan before task decomposition | |
| self.planned_steps = [ | |
| 'Problem Background', | |
| 'Problem Requirement', | |
| 'Problem Analysis', | |
| 'High-Level Modeling', | |
| 'Task Decomposition', | |
| 'Dependency Analysis' # Added explicit step | |
| ] | |
| def _check_dependencies(self, step_name: str) -> bool: | |
| """Checks if all prerequisites for a given step are completed.""" | |
| if step_name not in self.dependencies: | |
| print(f"Warning: No dependency information defined for step '{step_name}'. Assuming runnable.") | |
| return True # Or False, depending on desired strictness | |
| prerequisites = self.dependencies.get(step_name, []) | |
| for prereq in prerequisites: | |
| if prereq not in self.completed_steps: | |
| print(f"Dependency Error: Step '{step_name}' requires '{prereq}', which is not completed.") | |
| return False | |
| return True | |
| def _update_planned_steps_after_decomp(self): | |
| """Adds all task-specific steps to the planned steps list.""" | |
| if not self.task_descriptions or self.order is None: | |
| print("Error: Cannot update planned steps. Task decomposition or dependency analysis incomplete.") | |
| return | |
| task_step_templates = [ | |
| 'Description', | |
| 'Analysis', | |
| 'Preliminary Formulas', | |
| 'Mathematical Modeling Process', | |
| 'Code' if self.with_code else None, # Add code step only if needed | |
| 'Solution Interpretation', | |
| 'Subtask Outcome Analysis', | |
| 'Charts', | |
| ] | |
| # Filter out None template (for no-code case) | |
| task_step_templates = [t for t in task_step_templates if t] | |
| new_task_steps = [] | |
| # Add steps in the determined execution order | |
| for task_id_int in self.order: | |
| for template in task_step_templates: | |
| new_task_steps.append(f'Task {task_id_int} {template}') | |
| # Append new task steps after the 'Dependency Analysis' step | |
| dep_analysis_index = self.planned_steps.index('Dependency Analysis') | |
| self.planned_steps = self.planned_steps[:dep_analysis_index+1] + new_task_steps | |
| # Initialize paper['tasks'] structure | |
| self.paper['tasks'] = [{} for _ in range(len(self.task_descriptions))] | |
| # --- Getters --- | |
| def get_completed_steps(self) -> set: | |
| """Returns the set of names of completed steps.""" | |
| return self.completed_steps | |
| def get_planned_steps(self) -> list: | |
| """Returns the list of names of planned steps (including completed).""" | |
| return self.planned_steps | |
| def get_paper(self) -> dict: | |
| """Returns the current state of the generated paper dictionary.""" | |
| # Ensure tasks are ordered correctly in the final output if needed, | |
| # although appending them in self.order sequence should handle this. | |
| return self.paper | |
| def save_paper(self, intermediate=False): | |
| """Saves the current paper state to files.""" | |
| filename = f"{self.name}_intermediate_{datetime.now().strftime('%Y%m%d%H%M%S')}" if intermediate else self.name | |
| json_path = os.path.join(self.output_path, 'json', f"{filename}.json") | |
| md_path = os.path.join(self.output_path, 'markdown', f"{filename}.md") | |
| # latex_path = os.path.join(self.output_path, 'latex', f"{filename}.tex") # Uncomment if needed | |
| write_json_file(json_path, self.paper) | |
| markdown_str = json_to_markdown(self.paper) | |
| write_text_file(md_path, markdown_str) | |
| # write_text_file(latex_path, markdown_to_latex(markdown_str)) # Uncomment if needed | |
| print(f"Saved paper snapshot to {json_path} and {md_path}") | |
| def save_usage(self): | |
| """Saves the LLM usage statistics.""" | |
| usage_path = os.path.join(self.output_path, 'usage', f"{self.name}.json") | |
| write_json_file(usage_path, self.llm.get_total_usage()) | |
| print(f"Saved LLM usage to {usage_path}") | |
| print(f"Total Usage: {self.llm.get_total_usage()}") | |
| # --- Step Generation Methods --- | |
| def _generate_problem_analysis(self, user_prompt: str = '', round: int = 0): | |
| print("Generating: Problem Analysis") | |
| self.problem_analysis = self.pa.analysis( | |
| self.problem_str, | |
| round=round if round > 0 else self.config.get('problem_analysis_round', 0), | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['problem_analysis'] = self.problem_analysis | |
| print("Completed: Problem Analysis") | |
| def _generate_high_level_modeling(self, user_prompt: str = '', round: int = 0): | |
| print("Generating: High-Level Modeling") | |
| # modeling_methods = "" # Load from constants if needed, currently unused | |
| self.modeling_solution = self.pm.modeling( | |
| self.problem_str, | |
| self.problem_analysis, | |
| "", # modeling_methods placeholder | |
| round=round if round > 0 else self.config.get('problem_modeling_round', 0), | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['high_level_modeling'] = self.modeling_solution # Use a consistent key | |
| print("Completed: High-Level Modeling") | |
| def _generate_task_decomposition(self, user_prompt: str = ''): | |
| print("Generating: Task Decomposition") | |
| self.task_descriptions = self.td.decompose_and_refine( | |
| self.problem_str, | |
| self.problem_analysis, | |
| self.modeling_solution, | |
| self.problem_type, | |
| self.config.get('tasknum', 4), # Default to 4 tasks if not specified | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['task_decomposition_summary'] = "\n".join([f"Task {i+1}: {desc}" for i, desc in enumerate(self.task_descriptions)]) # Add summary to paper | |
| print(f"Completed: Task Decomposition ({len(self.task_descriptions)} tasks)") | |
| # Now that we know the tasks, update the planned steps | |
| # self._update_planned_steps_after_decomp() # This will be called after dependency analysis | |
| def _generate_dependency_analysis(self): | |
| print("Generating: Dependency Analysis") | |
| self.order = self.coordinator.analyze_dependencies( | |
| self.problem_str, | |
| self.problem_analysis, | |
| self.modeling_solution, | |
| self.task_descriptions, | |
| self.with_code | |
| ) | |
| self.order = [int(i) for i in self.order] # Ensure integer IDs | |
| self.paper['task_execution_order'] = self.order # Store the order | |
| self.paper['task_dependency_analysis'] = self.coordinator.task_dependency_analysis # Store rationale | |
| print(f"Completed: Dependency Analysis. Execution order: {self.order}") | |
| # Update planned steps and dependencies now that order and DAG are known | |
| self._update_planned_steps_after_decomp() | |
| self._update_dependencies_after_decomp() | |
| print(f"Updated planned steps: {self.planned_steps}") | |
| def _generate_task_step(self, task_id: int, step_type: str, user_prompt: str = '', round: int = 0): | |
| """Handles generation for a specific step within a specific task.""" | |
| print(f"Generating: Task {task_id} {step_type}") | |
| task_index = task_id - 1 # 0-based index | |
| # Ensure the task dictionary exists | |
| if task_index >= len(self.paper['tasks']): | |
| print(f"Error: Task index {task_index} out of bounds for self.paper['tasks'].") | |
| return False # Indicate failure | |
| # --- Prepare common inputs for task steps --- | |
| task_description = self.task_descriptions[task_index] | |
| # Retrieve previously generated parts for this task, if they exist | |
| current_task_dict = self.paper['tasks'][task_index] | |
| task_analysis = current_task_dict.get('task_analysis') | |
| task_formulas = current_task_dict.get('preliminary_formulas') | |
| task_modeling = current_task_dict.get('mathematical_modeling_process') | |
| task_code = current_task_dict.get('task_code') | |
| execution_result = current_task_dict.get('execution_result') | |
| task_result = current_task_dict.get('solution_interpretation') | |
| # --- Construct Dependency Prompt --- | |
| task_dependency_ids = [int(i) for i in self.coordinator.DAG.get(str(task_id), [])] | |
| dependency_prompt = "" | |
| dependent_file_prompt = "" # Specifically for coding step | |
| if len(task_dependency_ids) > 0: | |
| # Fetch dependency analysis rationale for the current task | |
| rationale = "" | |
| if self.coordinator.task_dependency_analysis and task_index < len(self.coordinator.task_dependency_analysis): | |
| rationale = self.coordinator.task_dependency_analysis[task_index] | |
| else: | |
| print(f"Warning: Could not find dependency rationale for Task {task_id}") | |
| dependency_prompt = f"This task is Task {task_id}, which depends on the following tasks: {task_dependency_ids}. The dependencies for this task are analyzed as follows: {rationale}\n" | |
| for dep_id in task_dependency_ids: | |
| dep_task_index = dep_id - 1 | |
| if dep_task_index < 0 or dep_task_index >= len(self.paper['tasks']): | |
| print(f"Warning: Cannot build dependency prompt. Dependent Task {dep_id} data not found.") | |
| continue | |
| dep_task_dict = self.paper['tasks'][dep_task_index] | |
| # Also try fetching from coordinator memory as a fallback if paper is not updated yet (shouldn't happen with dependency checks) | |
| dep_mem_dict = self.coordinator.memory.get(str(dep_id), {}) | |
| dep_code_mem_dict = self.coordinator.code_memory.get(str(dep_id), {}) | |
| dependency_prompt += f"---\n# The Description of Task {dep_id}:\n{dep_task_dict.get('task_description', dep_mem_dict.get('task_description', 'N/A'))}\n" | |
| dependency_prompt += f"# The modeling method for Task {dep_id}:\n{dep_task_dict.get('mathematical_modeling_process', dep_mem_dict.get('mathematical_modeling_process', 'N/A'))}\n" | |
| if self.with_code: | |
| # Try getting code structure from paper first, then coordinator memory | |
| code_structure_str = json.dumps(dep_task_dict.get('code_structure', dep_code_mem_dict), indent=2) if dep_task_dict.get('code_structure', dep_code_mem_dict) else "{}" # Default to empty json object string | |
| dependency_prompt += f"# The structure of code for Task {dep_id}:\n{code_structure_str}\n" | |
| dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" | |
| dependent_file_prompt += f"# The files generated by code for Task {dep_id}:\n{code_structure_str}\n" # Use the same structure info | |
| else: | |
| dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" | |
| # Append general instructions based on the step type | |
| task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT if step_type == 'Analysis' else dependency_prompt | |
| task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT if step_type == 'Preliminary Formulas' else dependency_prompt | |
| task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT if step_type == 'Mathematical Modeling Process' else dependency_prompt | |
| # --- Execute Specific Step Logic --- | |
| success = True | |
| try: | |
| if step_type == 'Description': | |
| # Description is directly from task_descriptions, just assign it | |
| self.paper['tasks'][task_index]['task_description'] = task_description | |
| # Store in coordinator memory as well for prompt building if needed later | |
| if str(task_id) not in self.coordinator.memory: self.coordinator.memory[str(task_id)] = {} | |
| self.coordinator.memory[str(task_id)]['task_description'] = task_description | |
| elif step_type == 'Analysis': | |
| task_analysis = self.task.analysis( | |
| task_analysis_prompt, # Includes dependency info | |
| task_description, | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['tasks'][task_index]['task_analysis'] = task_analysis | |
| self.coordinator.memory[str(task_id)]['task_analysis'] = task_analysis | |
| elif step_type == 'Preliminary Formulas': | |
| if not task_analysis: raise ValueError(f"Task {task_id} Analysis is missing.") | |
| description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}' | |
| top_modeling_methods = modeling_methods # self.mr.top_methods(description_and_analysis, top_k=self.config.get('top_method_num', 6)) | |
| task_formulas = self.task.formulas( | |
| task_formulas_prompt, # Includes dependency info | |
| self.problem.get('data_description', ''), | |
| task_description, | |
| task_analysis, | |
| top_modeling_methods, | |
| round=round if round > 0 else self.config.get('task_formulas_round', 0), | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['tasks'][task_index]['preliminary_formulas'] = task_formulas | |
| self.coordinator.memory[str(task_id)]['preliminary_formulas'] = task_formulas | |
| elif step_type == 'Mathematical Modeling Process': | |
| if not task_analysis or not task_formulas: raise ValueError(f"Task {task_id} Analysis or Formulas missing.") | |
| task_modeling = self.task.modeling( | |
| task_modeling_prompt, # Includes dependency info | |
| self.problem.get('data_description', ''), | |
| task_description, | |
| task_analysis, | |
| task_formulas, | |
| round=round if round > 0 else self.config.get('task_modeling_round', 0), | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['tasks'][task_index]['mathematical_modeling_process'] = task_modeling | |
| self.coordinator.memory[str(task_id)]['mathematical_modeling_process'] = task_modeling | |
| elif step_type == 'Code' and self.with_code: | |
| if not task_analysis or not task_formulas or not task_modeling: | |
| raise ValueError(f"Task {task_id} Analysis, Formulas, or Modeling missing for coding.") | |
| code_template_path = os.path.join('../data/actor_data/input/code_template', f'main{task_id}.py') | |
| code_template = "" | |
| if os.path.exists(code_template_path): | |
| with open(code_template_path, 'r') as f: | |
| code_template = f.read() | |
| else: | |
| print(f"Warning: Code template not found at {code_template_path}. Using empty template.") | |
| save_path = os.path.join(self.output_path, 'code', f'main{task_id}.py') | |
| work_dir = os.path.join(self.output_path, 'code') | |
| script_name = f'main{task_id}.py' | |
| dataset_input_path = self.problem.get('dataset_path') or self.dataset_path # Prefer path from problem spec | |
| task_code, is_pass, execution_result = self.task.coding( | |
| dataset_input_path, # Use actual dataset path | |
| self.problem.get('data_description', ''), | |
| self.problem.get('variable_description', ''), | |
| task_description, | |
| task_analysis, | |
| task_formulas, | |
| task_modeling, | |
| dependent_file_prompt, # Pass file dependency info | |
| code_template, | |
| script_name, | |
| work_dir, | |
| try_num=5, | |
| round=round if round > 0 else 1, | |
| user_prompt=user_prompt | |
| ) | |
| code_structure = self.task.extract_code_structure(task_id, task_code, save_path) # Uses save_path now | |
| # self.paper['tasks'][task_index]['task_code'] = task_code | |
| self.paper['tasks'][task_index]['task_code'] = '```Python\n' + task_code + '\n```' | |
| self.paper['tasks'][task_index]['is_pass'] = is_pass | |
| self.paper['tasks'][task_index]['execution_result'] = execution_result | |
| self.paper['tasks'][task_index]['code_structure'] = code_structure # Store structure in paper | |
| # Update coordinator's code memory as well | |
| self.coordinator.code_memory[str(task_id)] = code_structure | |
| elif step_type == 'Solution Interpretation': | |
| if not task_modeling: raise ValueError(f"Task {task_id} Modeling is missing.") | |
| if self.with_code and execution_result is None: raise ValueError(f"Task {task_id} Code execution result is missing.") | |
| task_result = self.task.result( | |
| task_description, | |
| task_analysis, | |
| task_formulas, | |
| task_modeling, | |
| user_prompt=user_prompt, | |
| execution_result=execution_result if self.with_code else '' | |
| ) | |
| self.paper['tasks'][task_index]['solution_interpretation'] = task_result | |
| self.coordinator.memory[str(task_id)]['solution_interpretation'] = task_result | |
| elif step_type == 'Subtask Outcome Analysis': | |
| if not task_result: raise ValueError(f"Task {task_id} Solution Interpretation is missing.") | |
| task_answer = self.task.answer( | |
| task_description, | |
| task_analysis, | |
| task_formulas, | |
| task_modeling, | |
| task_result, | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['tasks'][task_index]['subtask_outcome_analysis'] = task_answer | |
| self.coordinator.memory[str(task_id)]['subtask_outcome_analysis'] = task_answer | |
| elif step_type == 'Charts': | |
| # Charts depend on the full task dictionary being available | |
| full_task_dict_str = json.dumps(self.paper['tasks'][task_index], indent=2) | |
| charts = self.chart.create_charts( | |
| full_task_dict_str, | |
| self.config.get('chart_num', 0), | |
| user_prompt=user_prompt | |
| ) | |
| self.paper['tasks'][task_index]['charts'] = charts | |
| self.coordinator.memory[str(task_id)]['charts'] = charts # Also save to coordinator memory if needed elsewhere | |
| else: | |
| print(f"Warning: Unknown step type '{step_type}' for Task {task_id}.") | |
| success = False | |
| except Exception as e: | |
| print(f"Error generating Task {task_id} {step_type}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| success = False # Mark step as failed | |
| if success: | |
| print(f"Completed: Task {task_id} {step_type}") | |
| return success | |
| # --- Main Generation Control --- | |
| def generate_step(self, step_name: str, user_prompt: str = '', round: int = 0, force_regenerate: bool = True) -> bool: | |
| """ | |
| Generates the content for a specific step, checking dependencies first. | |
| Args: | |
| step_name: The name of the step to generate (e.g., 'Problem Analysis', 'Task 1 Preliminary Formulas'). | |
| user_prompt: Optional user guidance to influence the generation. | |
| round: Number of improvement rounds to apply (where applicable). | |
| force_regenerate: If True, regenerate the step even if it's already completed. | |
| Returns: | |
| True if the step was generated successfully (or was already complete), False otherwise. | |
| """ | |
| if step_name in self.completed_steps and not force_regenerate: | |
| print(f"Skipping already completed step: '{step_name}'") | |
| return True | |
| if step_name in self.completed_steps and force_regenerate: | |
| print(f"Regenerating step: '{step_name}'") | |
| # Remove the step from completed_steps to allow regeneration | |
| self.completed_steps.remove(step_name) | |
| if not self._check_dependencies(step_name): | |
| print(f"Cannot generate step '{step_name}' due to unmet dependencies.") | |
| return False | |
| # Dispatch to the appropriate generation method | |
| success = False | |
| try: | |
| if step_name == 'Problem Analysis': | |
| self._generate_problem_analysis(user_prompt, round) | |
| success = True | |
| elif step_name == 'High-Level Modeling': | |
| self._generate_high_level_modeling(user_prompt, round) | |
| success = True | |
| elif step_name == 'Task Decomposition': | |
| self._generate_task_decomposition(user_prompt) | |
| success = True # Decomp itself is done, planning/deps updated later | |
| elif step_name == 'Dependency Analysis': | |
| self._generate_dependency_analysis() | |
| success = True # Analysis itself is done | |
| elif step_name.startswith('Task '): | |
| # Parse task ID and step type | |
| match = re.match(r"Task (\d+) (.*)", step_name) | |
| if match: | |
| task_id = int(match.group(1)) | |
| step_type = match.group(2) | |
| # Ensure task steps are only generated if their task ID is valid | |
| if self.order and task_id in self.order: | |
| success = self._generate_task_step(task_id, step_type, user_prompt, round) | |
| elif not self.order: | |
| print(f"Error: Cannot generate task step '{step_name}'. Task order not determined yet.") | |
| success = False | |
| else: | |
| print(f"Error: Cannot generate task step '{step_name}'. Task ID {task_id} not found in execution order {self.order}.") | |
| success = False | |
| else: | |
| print(f"Error: Could not parse task step name: '{step_name}'") | |
| success = False | |
| else: | |
| # Handle Problem Background and Requirement (already done in init) | |
| if step_name in ['Problem Background', 'Problem Requirement']: | |
| print(f"Step '{step_name}' completed during initialization.") | |
| success = True # Mark as successful completion | |
| else: | |
| print(f"Error: Unknown step name: '{step_name}'") | |
| success = False | |
| if success: | |
| self.completed_steps.add(step_name) | |
| # Optional: Save intermediate state after each successful step | |
| # self.save_paper(intermediate=True) | |
| except Exception as e: | |
| print(f"Critical error during generation of step '{step_name}': {e}") | |
| import traceback | |
| traceback.print_exc() | |
| success = False | |
| return success | |
| def run_sequential(self, force_regenerate_all: bool = False): | |
| """ | |
| Runs the entire generation process sequentially, step by step. | |
| Args: | |
| force_regenerate_all: If True, regenerate all steps even if already completed. | |
| """ | |
| print("Starting sequential generation...") | |
| current_step_index = 0 | |
| # Clear completed steps if regenerating all | |
| if force_regenerate_all: | |
| print("Force regenerating all steps...") | |
| self.completed_steps.clear() | |
| while current_step_index < len(self.planned_steps): | |
| # Check if planned_steps was modified (e.g., by task decomp/dependency analysis) | |
| if current_step_index >= len(self.planned_steps): | |
| print("Reached end of planned steps.") | |
| break # Avoid index error if list shrinks unexpectedly | |
| step_name = self.planned_steps[current_step_index] | |
| print(f"\n--- Attempting Step: {step_name} ({current_step_index + 1}/{len(self.planned_steps)}) ---") | |
| if step_name in self.completed_steps: | |
| print(f"Skipping already completed step: '{step_name}'") | |
| current_step_index += 1 | |
| continue | |
| # Record length before generation in case planned_steps changes | |
| length_before = len(self.planned_steps) | |
| success = self.generate_step(step_name, force_regenerate_all) | |
| length_after = len(self.planned_steps) | |
| if success: | |
| print(f"--- Successfully completed step: '{step_name}' ---") | |
| # If the number of planned steps increased, it means task steps were added. | |
| # The loop condition `current_step_index < len(self.planned_steps)` | |
| # will naturally handle iterating through the newly added steps. | |
| # We just need to increment the index to move to the *next* step | |
| # in the potentially updated list. | |
| current_step_index += 1 | |
| else: | |
| print(f"--- Failed to complete step: '{step_name}'. Stopping generation. ---") | |
| break # Stop processing if a step fails | |
| print("\nSequential generation process finished.") | |
| self.save_paper() # Save final result | |
| self.save_usage() | |
| print(f"Final paper saved for run '{self.name}' in '{self.output_path}'.") | |
| print(f"Completed steps: {self.completed_steps}") | |
| if current_step_index < len(self.planned_steps): | |
| print(f"Next planned step was: {self.planned_steps[current_step_index]}") | |
| def generate_paper(self, project_dir: str): | |
| # Example usage | |
| metadata = { | |
| "team": "Agent", | |
| "year": self.problem_year, | |
| "problem_type": self.problem_type | |
| } | |
| json_file_path = f"{project_dir}/json/{self.problem_year}_{self.problem_type}.json" | |
| with open(json_file_path, 'w+') as f: | |
| json.dump(self.paper, f, indent=2) | |
| code_dir = f'{project_dir}/code' | |
| metadata['figures'] = [os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['png', 'jpg', 'jpeg']] | |
| metadata['codes'] = sorted([os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['py']]) | |
| generate_paper_from_json(self.llm, self.paper, metadata, os.path.join(project_dir, 'latex'), 'solution') | |
| # --- Example Usage --- | |
| def create_generator(name): | |
| """Helper function to set up configuration and create the agent system.""" | |
| config = { | |
| 'top_method_num': 6, | |
| 'problem_analysis_round': 0, | |
| 'problem_modeling_round': 0, | |
| 'task_formulas_round': 0, | |
| 'tasknum': 4, # Default task number if not inferred | |
| 'chart_num': 0, # Set to > 0 to generate charts | |
| 'model_name': 'gpt-4o-mini', # Or your preferred model | |
| "method_name": "MM-Agent-Refactored" # Name for the experiment/output folder | |
| } | |
| # Adjust paths relative to the script location or use absolute paths | |
| base_data_path = '../data/actor_data' # Adjust if necessary | |
| problem_file = os.path.join(base_data_path, 'input', 'problem', f'{name}.json') | |
| dataset_input_path = os.path.join(base_data_path, 'input', 'dataset', name) # Path to check for dataset files | |
| output_dir = os.path.join(base_data_path, 'exps', config["method_name"]) | |
| # Create a unique output path for this specific run | |
| run_output_path = os.path.join(output_dir, f"{name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}") | |
| if not os.path.exists(problem_file): | |
| print(f"Error: Problem file not found at {problem_file}") | |
| return None | |
| # Output path is created inside the class constructor now | |
| # if not os.path.exists(output_dir): | |
| # os.makedirs(output_dir) | |
| multi_agent = ModelingAgentSystem( | |
| problem_path=problem_file, | |
| config=config, | |
| dataset_path=dataset_input_path, # Pass the specific dataset path | |
| output_path=run_output_path, | |
| name=name | |
| ) | |
| return multi_agent | |
| if __name__ == "__main__": | |
| problem_name = "2024_C" # Example problem name | |
| agent_system = create_generator(problem_name) | |
| if agent_system: | |
| # --- Option 1: Run the whole process sequentially --- | |
| agent_system.run_sequential() | |
| # --- Option 2: Generate specific steps manually (Example) --- | |
| # print("\n--- Manual Step Generation Example ---") | |
| # # Assuming initialization is done in create_generator | |
| # agent_system.generate_step('Problem Analysis') | |
| # agent_system.generate_step('High-Level Modeling') | |
| # agent_system.generate_step('Task Decomposition') | |
| # agent_system.generate_step('Dependency Analysis') # Needed before task steps | |
| # # Now planned_steps and dependencies should be updated | |
| # print("Planned steps after decomp/dep analysis:", agent_system.get_planned_steps()) | |
| # print("Dependencies:", agent_system.dependencies) # View the updated dependencies | |
| # # Try generating the first step of the first task in the order | |
| # if agent_system.order: | |
| # first_task_id = agent_system.order[0] | |
| # agent_system.generate_step(f'Task {first_task_id} Description') | |
| # agent_system.generate_step(f'Task {first_task_id} Analysis') | |
| # # ... and so on | |
| # else: | |
| # print("Cannot run manual task steps, order not determined.") | |
| # print("\n--- Final State after Manual Steps ---") | |
| # print("Completed Steps:", agent_system.get_completed_steps()) | |
| # final_paper = agent_system.get_paper() | |
| # print("Generated Paper Content (summary):") | |
| # print(json.dumps(final_paper, indent=2, default=str)[:1000] + "\n...") # Print partial paper | |
| # agent_system.save_paper() # Save the result | |
| # agent_system.save_usage() | |
| # --- Option 3: Iterate using the provided loop structure --- | |
| # print("\n--- Iterative Generation Example ---") | |
| # current_step_index = 0 | |
| # while current_step_index < len(agent_system.planned_steps): | |
| # # Check if planned_steps changed during iteration | |
| # if current_step_index >= len(agent_system.planned_steps): | |
| # print("Reached end due to plan changes.") | |
| # break | |
| # step_name = agent_system.planned_steps[current_step_index] | |
| # print(f"\nAttempting step ({current_step_index+1}/{len(agent_system.planned_steps)}): {step_name}") | |
| # if step_name in agent_system.completed_steps: | |
| # print(f"Skipping already completed step: '{step_name}'") | |
| # current_step_index += 1 | |
| # continue | |
| # success = agent_system.generate_step(step_name) | |
| # if not success: | |
| # print(f"Failed on step: {step_name}. Stopping.") | |
| # break | |
| # # Increment index regardless of whether plan changed, | |
| # # the loop condition handles the updated length | |
| # current_step_index += 1 | |
| # print("\n--- Final State after Iterative Loop ---") | |
| # print("Completed Steps:", agent_system.get_completed_steps()) | |
| # final_paper = agent_system.get_paper() | |
| # # print("Generated Paper Content (full):") | |
| # # print(json.dumps(final_paper, indent=2, default=str)) | |
| # agent_system.save_paper() # Save the result | |
| # agent_system.save_usage() | |