> Multi-Agent Systems

Budding
planted Jan 8, 2026tended Jan 8, 2026
#ai-agents#multi-agent#collaboration#coordination

Multi-Agent Systems

🌿 Budding note β€” collaborative agent architectures.

Why Multiple Agents?

Single agents have limitations. Multiple specialized agents can:

  • Divide complex tasks into manageable subtasks
  • Specialize in specific domains
  • Validate each other's work
  • Scale horizontally
  • Simulate organizational structures

Related: AI Agents Fundamentals for single-agent basics

Coordination Patterns

1. Sequential Pipeline

Agents work in sequence, each building on the previous:

class SequentialPipeline:
    """Agents pass work sequentially"""
    def __init__(self, agents: list):
        self.agents = agents

    async def run(self, task: str):
        result = task

        for agent in self.agents:
            print(f"Agent {agent.name} processing...")
            result = await agent.process(result)

        return result

# Example: Content pipeline
pipeline = SequentialPipeline([
    ResearchAgent(),
    WriterAgent(),
    EditorAgent()
])

article = await pipeline.run("Write about AI agents")

2. Parallel Collaboration

Agents work simultaneously and results are combined:

class ParallelCollaboration:
    """Agents work concurrently"""
    def __init__(self, agents: list):
        self.agents = agents

    async def run(self, task: str):
        # All agents work on task simultaneously
        results = await asyncio.gather(*[
            agent.process(task) for agent in self.agents
        ])

        # Synthesize results
        return self.synthesize(results)

    def synthesize(self, results: list) -> str:
        """Combine agent outputs"""
        return "\n\n".join(results)

# Example: Research team
team = ParallelCollaboration([
    WebSearchAgent(),
    PaperSearchAgent(),
    ExpertConsultAgent()
])

research = await team.run("Latest AI developments")

3. Hierarchical (Manager-Worker)

Manager coordinates specialist workers:

class HierarchicalSystem:
    """Manager delegates to specialists"""
    def __init__(self):
        self.manager = ManagerAgent()
        self.workers = {
            "research": ResearchAgent(),
            "code": CodingAgent(),
            "test": TestingAgent()
        }

    async def run(self, goal: str):
        # Manager creates plan
        plan = await self.manager.plan(goal)

        # Delegate tasks
        results = {}
        for task in plan.tasks:
            worker = self.workers[task.specialist]
            result = await worker.execute(task)
            results[task.id] = result

            # Manager monitors progress
            await self.manager.update_progress(task.id, result)

        # Manager synthesizes
        return await self.manager.synthesize(results)

4. Democratic Voting

Agents vote on decisions:

class DemocraticSystem:
    """Agents vote on best solution"""
    def __init__(self, agents: list):
        self.agents = agents

    async def run(self, problem: str):
        # Each agent proposes solution
        proposals = await asyncio.gather(*[
            agent.propose_solution(problem) for agent in self.agents
        ])

        # Agents vote on each proposal
        votes = []
        for proposal in proposals:
            score = await self._vote(proposal)
            votes.append((score, proposal))

        # Return highest-voted solution
        votes.sort(reverse=True, key=lambda x: x[0])
        return votes[0][1]

    async def _vote(self, proposal: str) -> float:
        """Agents rate proposal"""
        ratings = await asyncio.gather(*[
            agent.rate_solution(proposal) for agent in self.agents
        ])
        return sum(ratings) / len(ratings)

Communication Protocols

Message Passing

from dataclasses import dataclass
from typing import Any

@dataclass
class Message:
    """Inter-agent message"""
    sender: str
    receiver: str
    content: Any
    message_type: str  # "request", "response", "broadcast"
    timestamp: float

class MessageBus:
    """Central message broker"""
    def __init__(self):
        self.mailboxes = defaultdict(list)
        self.subscribers = defaultdict(list)

    def send(self, message: Message):
        """Send to specific agent"""
        self.mailboxes[message.receiver].append(message)

    def broadcast(self, message: Message):
        """Send to all subscribers"""
        for agent_id in self.subscribers[message.message_type]:
            msg = Message(
                sender=message.sender,
                receiver=agent_id,
                content=message.content,
                message_type=message.message_type,
                timestamp=time.time()
            )
            self.mailboxes[agent_id].append(msg)

    def receive(self, agent_id: str) -> list[Message]:
        """Get messages for agent"""
        messages = self.mailboxes[agent_id]
        self.mailboxes[agent_id] = []
        return messages

    def subscribe(self, agent_id: str, message_type: str):
        """Subscribe to message type"""
        self.subscribers[message_type].append(agent_id)

Shared Memory

class SharedMemory:
    """Common knowledge base"""
    def __init__(self):
        self.data = {}
        self.locks = {}

    async def write(self, key: str, value: Any, agent_id: str):
        """Write with locking"""
        async with self._get_lock(key):
            self.data[key] = {
                "value": value,
                "author": agent_id,
                "timestamp": time.time()
            }

    async def read(self, key: str) -> Any:
        """Read shared data"""
        return self.data.get(key, {}).get("value")

    def _get_lock(self, key: str):
        """Get lock for key"""
        if key not in self.locks:
            self.locks[key] = asyncio.Lock()
        return self.locks[key]

Agent Roles

Specialized Agents

class ResearchAgent:
    """Gathers information"""
    async def process(self, query: str) -> str:
        sources = await self.web_search(query)
        papers = await self.paper_search(query)
        return self.synthesize(sources, papers)

class CriticAgent:
    """Reviews and critiques work"""
    async def process(self, content: str) -> dict:
        return {
            "errors": await self.find_errors(content),
            "improvements": await self.suggest_improvements(content),
            "rating": await self.rate_quality(content)
        }

class SynthesisAgent:
    """Combines multiple inputs"""
    async def process(self, inputs: list[str]) -> str:
        return await self.llm.generate(f"""
        Synthesize these perspectives into a coherent response:
        {inputs}
        """)

CrewAI Example

Framework for role-based collaboration:

from crewai import Agent, Task, Crew, Process

# Define specialized agents
researcher = Agent(
    role='Senior Research Analyst',
    goal='Uncover cutting-edge developments in AI',
    backstory='Expert at finding and analyzing information',
    tools=[web_search, scraper],
    verbose=True
)

writer = Agent(
    role='Tech Content Strategist',
    goal='Craft compelling content about AI',
    backstory='Skilled at making complex topics accessible',
    tools=[],
    verbose=True
)

editor = Agent(
    role='Chief Editor',
    goal='Ensure content is polished and accurate',
    backstory='Meticulous attention to detail',
    tools=[grammar_check],
    verbose=True
)

# Define workflow
research_task = Task(
    description='Research latest AI agent developments',
    agent=researcher,
    expected_output='Detailed research notes'
)

writing_task = Task(
    description='Write engaging article from research',
    agent=writer,
    expected_output='Draft article',
    context=[research_task]  # Depends on research
)

editing_task = Task(
    description='Polish and perfect the article',
    agent=editor,
    expected_output='Final article',
    context=[writing_task]  # Depends on writing
)

# Create crew
crew = Crew(
    agents=[researcher, writer, editor],
    tasks=[research_task, writing_task, editing_task],
    process=Process.sequential
)

# Execute
result = crew.kickoff()

Related: Agent Frameworks Comparison for more frameworks

Conflict Resolution

When agents disagree:

class ConflictResolver:
    """Resolve agent conflicts"""
    def __init__(self, arbiter_agent):
        self.arbiter = arbiter_agent

    async def resolve(self, conflict: dict) -> dict:
        """Mediate between conflicting agents"""
        # Get each agent's argument
        arguments = conflict["arguments"]

        # Arbiter evaluates
        decision = await self.arbiter.evaluate(
            f"""Two agents disagree:
            Agent A: {arguments['agent_a']}
            Agent B: {arguments['agent_b']}

            Which is correct and why?"""
        )

        return {
            "resolution": decision,
            "resolved_at": time.time()
        }

Connection Points

Prerequisites:

Related: