wd666/orchestrator/research_manager.py

106 lines
4.7 KiB
Python

from typing import List, Dict, Generator
from dataclasses import dataclass
from agents.research_agent import ResearchAgent
from utils.llm_client import LLMClient
import config
@dataclass
class ResearchConfig:
topic: str
context: str = ""
# Dynamic list of experts: [{"name": "Expert 1", "model": "gpt-4o", "role": "analyst"}, ...]
experts: List[Dict[str, str]] = None
class ResearchManager:
"""Manages the Multi-Model Council workflow"""
def __init__(self, api_key: str, base_url: str = None, provider: str = "aihubmix"):
self.api_key = api_key
self.base_url = base_url
self.provider = provider
self.agents = []
def _get_client(self, model: str) -> LLMClient:
return LLMClient(
provider=self.provider, # Configured to respect provider or default to aihubmix logic inside client
api_key=self.api_key,
base_url=self.base_url,
model=model
)
def create_agents(self, config: ResearchConfig):
"""Initialize agents with specific models from config"""
self.agents = []
if config.experts:
for idx, expert_conf in enumerate(config.experts):
# Assign role based on position or config
# First agents are discussion members, last one is Synthesizer usually,
# but for equality we treat them all as members until the end.
# We'll assign a generic "member" role or specific if provided.
role_type = "council_member"
# If it's the last one, maybe give them synthesizer duty?
# For now, all are members, and we explicitly pick one for synthesis.
agent = ResearchAgent(
role=role_type,
llm_client=self._get_client(expert_conf["model"]),
name=expert_conf.get("name", f"Expert {idx+1}")
)
self.agents.append(agent)
def collaborate(self, topic: str, context: str, max_rounds: int = 3) -> Generator[Dict[str, str], None, None]:
"""
Execute the collaborative research process with multi-round discussion:
1. Conversation Loop (All Experts Round Robin)
2. Final Synthesis (Last Expert)
"""
conversation_history = []
discussion_context = f"Topic: '{topic}'\nBackground Context: {context}\n\n"
# Round-Robin Discussion
for round_num in range(1, max_rounds + 1):
for agent in self.agents:
yield {"type": "step_start", "step": f"Round {round_num}: {agent.name}", "agent": agent.name, "model": agent.model_name}
# Construct prompt
if round_num == 1 and not conversation_history:
prompt = f"You are {agent.name}. You are starting the discussion on '{topic}'. Provide your initial analysis and key points. Be conversational but substantive."
else:
prompt = f"You are {agent.name}. Review the discussion so far. Respond to previous points. Defend your views or refine them. Keep the discussion moving towards a solution.\n\nDiscussion History:\n{_format_history(conversation_history)}"
response = ""
for chunk in agent.generate(prompt, context=discussion_context):
response += chunk
yield {"type": "content", "content": chunk}
conversation_history.append({"agent": agent.name, "content": response})
yield {"type": "step_end", "output": response}
# Final Synthesis by the LAST agent (or a specific designated one)
synthesizer = self.agents[-1]
yield {"type": "step_start", "step": f"Final Synthesis ({synthesizer.name})", "agent": synthesizer.name, "model": synthesizer.model_name}
prompt_syn = f"""Synthesize the entire discussion into a final comprehensive plan for '{topic}'.
Discussion History:
{_format_history(conversation_history)}
IMPORTANT:
1. Reconcile the different viewpoints from all experts.
2. Provide a concrete action plan.
3. You MUST include a Mermaid.js diagram (using ```mermaid code block) to visualize the roadmap or process."""
findings_syn = ""
for chunk in synthesizer.generate(prompt_syn, context=discussion_context):
findings_syn += chunk
yield {"type": "content", "content": chunk}
yield {"type": "step_end", "output": findings_syn}
def _format_history(history: List[Dict[str, str]]) -> str:
formatted = ""
for turn in history:
formatted += f"[{turn['agent']}]: {turn['content']}\n\n"
return formatted