Source code for aurora_swarm.patterns.blackboard

"""Pattern 4 — Blackboard (Shared-State Swarm).

Agents collaborate through a shared mutable workspace divided into named
sections.  The session runs in rounds.  Each round every agent reads the
current board, a role-specific prompt function generates a customised
prompt, and all agents respond in parallel.
"""

from __future__ import annotations

import copy
from typing import Any, Callable

from aurora_swarm.pool import AgentPool


# Type aliases
BoardState = dict[str, list[str]]
PromptFn = Callable[[str, BoardState], str]
ConvergenceFn = Callable[[BoardState], bool]


[docs] class Blackboard: """Shared-state workspace for multi-round agent collaboration. Parameters ---------- sections: Names of the board sections (e.g. ``["hypotheses", "critiques"]``). prompt_fn: ``prompt_fn(role, board_state) -> str`` — generates the prompt that an agent with the given *role* should receive, given the current board contents. """ def __init__( self, sections: list[str], prompt_fn: PromptFn, ) -> None: self._board: BoardState = {section: [] for section in sections} self._prompt_fn = prompt_fn self._round = 0 # -- public API ---------------------------------------------------------- @property def board(self) -> BoardState: """Current board state (mutable reference).""" return self._board @property def round(self) -> int: """Number of completed rounds.""" return self._round
[docs] def snapshot(self) -> dict[str, Any]: """Return a serialisable deep copy of the board state.""" return { "round": self._round, "board": copy.deepcopy(self._board), }
[docs] async def run( self, pool: AgentPool, max_rounds: int = 10, convergence_fn: ConvergenceFn | None = None, ) -> BoardState: """Execute rounds until *max_rounds* or convergence. Agent roles are determined by the ``role`` tag on each endpoint. Agents whose ``role`` matches a board section contribute to that section. Agents with no ``role`` tag or a role not in the board sections are skipped. Parameters ---------- pool: Agent pool with role-tagged endpoints. max_rounds: Upper bound on the number of rounds. convergence_fn: Optional ``convergence_fn(board_state) -> bool``. If it returns ``True`` after a round the session stops early. Returns ------- BoardState The final board. """ sections = list(self._board.keys()) for _ in range(max_rounds): for section in sections: sub = pool.by_tag("role", section) if sub.size == 0: continue prompt = self._prompt_fn(section, self._board) responses = await sub.broadcast_prompt(prompt) for r in responses: if r.success: self._board[section].append(r.text) self._round += 1 if convergence_fn is not None and convergence_fn(self._board): break return self._board