Skip to content

Agent

stirrup.core.agent

AGENT_MAX_TURNS module-attribute

AGENT_MAX_TURNS = 30

CONTEXT_SUMMARIZATION_CUTOFF module-attribute

CONTEXT_SUMMARIZATION_CUTOFF = 0.7

FINISH_TOOL_NAME module-attribute

FINISH_TOOL_NAME = 'finish'

MESSAGE_SUMMARIZER module-attribute

MESSAGE_SUMMARIZER = read_text(encoding='utf-8')

MESSAGE_SUMMARIZER_BRIDGE_TEMPLATE module-attribute

MESSAGE_SUMMARIZER_BRIDGE_TEMPLATE = read_text(
    encoding="utf-8"
)

DEFAULT_TOOLS module-attribute

SIMPLE_FINISH_TOOL module-attribute

SIMPLE_FINISH_TOOL: Tool[
    FinishParams, ToolUseCountMetadata
] = Tool[FinishParams, ToolUseCountMetadata](
    name=FINISH_TOOL_NAME,
    description="Signal task completion with a reason. Use when the task is finished or cannot proceed further. Note that you will need a separate turn to finish.",
    parameters=FinishParams,
    executor=lambda params: ToolResult(
        content=reason, metadata=ToolUseCountMetadata()
    ),
)

_PARENT_DEPTH module-attribute

_PARENT_DEPTH: ContextVar[int] = ContextVar(
    "parent_depth", default=0
)

logger module-attribute

logger = getLogger(__name__)

_SESSION_STATE module-attribute

_SESSION_STATE: ContextVar[SessionState] = ContextVar(
    "session_state"
)

__all__ module-attribute

__all__ = ['Agent', 'SubAgentParams']

LOGGER module-attribute

LOGGER = getLogger(__name__)

DEFAULT_SUB_AGENT_DESCRIPTION module-attribute

DEFAULT_SUB_AGENT_DESCRIPTION = "A sub agent that can be used to handle a contained, specific task."

AGENT_NAME_PATTERN module-attribute

AGENT_NAME_PATTERN = compile('^[a-zA-Z0-9_-]{1,128}$')

ChatMessage

ChatMessage = Annotated[
    SystemMessage
    | UserMessage
    | AssistantMessage
    | ToolMessage,
    Field(discriminator=role),
]

Discriminated union of all message types, automatically parsed based on role field.

AssistantMessage

Bases: BaseModel

LLM response message with optional tool calls and token usage tracking.

ImageContentBlock

Bases: BinaryContentBlock

Image content supporting PNG, JPEG, WebP, PSD formats with automatic downscaling.

to_base64_url

to_base64_url(
    max_pixels: int | None = RESOLUTION_1MP,
) -> str

Convert image to base64 data URL, optionally resizing to max pixel count.

Source code in src/stirrup/core/models.py
def to_base64_url(self, max_pixels: int | None = RESOLUTION_1MP) -> str:
    """Convert image to base64 data URL, optionally resizing to max pixel count."""
    img: Image.Image = Image.open(BytesIO(self.data))
    if max_pixels is not None and img.width * img.height > max_pixels:
        tw, th = downscale_image(img.width, img.height, max_pixels)
        img.thumbnail((tw, th), Image.Resampling.LANCZOS)
    if img.mode != "RGB":
        img = img.convert("RGB")
    buf = BytesIO()
    img.save(buf, format="PNG")
    return f"data:image/png;base64,{b64encode(buf.getvalue()).decode()}"

LLMClient

Bases: Protocol

Protocol defining the interface for LLM client implementations.

Any LLM client must implement this protocol to work with the Agent class. Provides text generation with tool support and model capability inspection.

SubAgentMetadata

Bases: BaseModel

Metadata from sub-agent execution including token usage, message history, and child run metadata.

Implements Addable protocol to support aggregation across multiple subagent calls.

__add__

Combine metadata from multiple subagent calls.

Source code in src/stirrup/core/models.py
def __add__(self, other: "SubAgentMetadata") -> "SubAgentMetadata":
    """Combine metadata from multiple subagent calls."""
    # Concatenate message histories
    combined_history = self.message_history + other.message_history
    # Merge run metadata (concatenate lists per key)
    combined_meta: dict[str, list[Any]] = dict(self.run_metadata)
    for key, metadata_list in other.run_metadata.items():
        if key in combined_meta:
            combined_meta[key] = combined_meta[key] + metadata_list
        else:
            combined_meta[key] = list(metadata_list)
    return SubAgentMetadata(
        message_history=combined_history,
        run_metadata=combined_meta,
    )

SystemMessage

Bases: BaseModel

System-level instructions and context for the LLM.

TokenUsage

Bases: BaseModel

Token counts for LLM usage (input, output, reasoning tokens).

total property

total: int

Total token count across input, output, and reasoning.

__add__

__add__(other: TokenUsage) -> TokenUsage

Add two TokenUsage objects together, summing each field independently.

Source code in src/stirrup/core/models.py
def __add__(self, other: "TokenUsage") -> "TokenUsage":
    """Add two TokenUsage objects together, summing each field independently."""
    return TokenUsage(
        input=self.input + other.input,
        output=self.output + other.output,
        reasoning=self.reasoning + other.reasoning,
    )

Tool

Bases: BaseModel

Tool definition with name, description, parameter schema, and executor function.

Generic over

P: Parameter model type (must be a Pydantic BaseModel, or None for parameterless tools) M: Metadata type (should implement Addable for aggregation; use None for tools without metadata)

Tools are simple, stateless callables. For tools requiring lifecycle management (setup/teardown, resource pooling), use a ToolProvider instead.

Example with parameters

class CalcParams(BaseModel): expression: str

calc_tool = ToolCalcParams, None)

Example without parameters

time_tool = ToolNone, None)

ToolCall

Bases: BaseModel

Represents a tool invocation request from the LLM.

Attributes:

Name Type Description
name str

Name of the tool to invoke

arguments str

JSON string containing tool parameters

tool_call_id str | None

Unique identifier for tracking this tool call and its result

ToolMessage

Bases: BaseModel

Tool execution result returned to the LLM.

ToolProvider

Bases: ABC

Abstract base class for tool providers with lifecycle management.

ToolProviders manage resources (HTTP clients, sandboxes, server connections) and return Tool instances when entering their async context. They implement the async context manager protocol.

Use ToolProvider for: - Tools requiring setup/teardown (connections, temp directories) - Tools that return multiple Tool instances (e.g., MCP servers) - Tools with shared state across calls (e.g., HTTP client pooling)

Example

class MyToolProvider(ToolProvider): async def aenter(self) -> Tool | list[Tool]: # Setup resources and return tool(s) return self._create_tool()

# __aexit__ is optional - default is no-op

Agent automatically manages ToolProvider lifecycle via its session() context.

__aenter__ abstractmethod async

__aenter__() -> Tool | list[Tool]

Enter async context: setup resources and return tool(s).

Returns:

Type Description
Tool | list[Tool]

A single Tool instance, or a list of Tool instances for providers

Tool | list[Tool]

that expose multiple tools (e.g., MCP servers).

Source code in src/stirrup/core/models.py
@abstractmethod
async def __aenter__(self) -> "Tool | list[Tool]":
    """Enter async context: setup resources and return tool(s).

    Returns:
        A single Tool instance, or a list of Tool instances for providers
        that expose multiple tools (e.g., MCP servers).
    """
    ...

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit async context: cleanup resources. Default: no-op.

Source code in src/stirrup/core/models.py
async def __aexit__(  # noqa: B027
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit async context: cleanup resources. Default: no-op."""

ToolResult

Bases: BaseModel

Result from a tool executor with optional metadata.

Generic over metadata type M. M should implement Addable protocol for aggregation support, but this is not enforced at the class level due to Pydantic schema generation limitations.

UserMessage

Bases: BaseModel

User input message to the LLM.

CodeExecToolProvider

CodeExecToolProvider(
    *, allowed_commands: list[str] | None = None
)

Bases: ToolProvider, ABC

Abstract base class for code execution tool providers.

CodeExecToolProvider is a ToolProvider that manages code execution environments (sandboxes, containers, local temp directories) and returns a code_exec Tool.

Subclasses must implement: - aenter(): Initialize environment and return the code_exec tool - aexit(): Cleanup the execution environment - run_command(): Execute a command and return raw result - read_file_bytes(): Read file content as bytes from the environment - write_file_bytes(): Write bytes to a file in the environment

Default implementations are provided for: - save_output_files(): Save files to local dir or another exec env (uses primitives) - upload_files(): Upload files from local or another exec env (uses primitives)

All code execution providers support an optional allowlist of command patterns. If provided, only commands matching at least one pattern are allowed. If None, all commands are allowed.

Usage with Agent

from stirrup.clients.chat_completions_client import ChatCompletionsClient

client = ChatCompletionsClient(model="gpt-5") agent = Agent( client=client, name="assistant", tools=[LocalCodeExecToolProvider(), CALCULATOR_TOOL], )

Initialize execution environment with optional command allowlist.

Parameters:

Name Type Description Default
allowed_commands list[str] | None

Optional list of regex patterns. If provided, only commands matching at least one pattern are allowed. If None, all commands are allowed.

None
Source code in src/stirrup/tools/code_backends/base.py
def __init__(self, *, allowed_commands: list[str] | None = None) -> None:
    """Initialize execution environment with optional command allowlist.

    Args:
        allowed_commands: Optional list of regex patterns. If provided, only
                         commands matching at least one pattern are allowed.
                         If None, all commands are allowed.

    """
    self._allowed_commands = allowed_commands
    self._compiled_allowed: list[re.Pattern[str]] | None = None
    if allowed_commands is not None:
        self._compiled_allowed = [re.compile(p) for p in allowed_commands]

__aenter__ abstractmethod async

Enter async context: set up environment and return code_exec tool.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def __aenter__(self) -> Tool[CodeExecutionParams, ToolUseCountMetadata]:
    """Enter async context: set up environment and return code_exec tool."""
    ...

__aexit__ abstractmethod async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Exit async context: cleanup the execution environment.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit async context: cleanup the execution environment."""
    ...

run_command abstractmethod async

run_command(
    cmd: str, *, timeout: int = SHELL_TIMEOUT
) -> CommandResult

Execute a shell command and return raw CommandResult.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def run_command(self, cmd: str, *, timeout: int = SHELL_TIMEOUT) -> CommandResult:
    """Execute a shell command and return raw CommandResult."""
    ...

read_file_bytes abstractmethod async

read_file_bytes(path: str) -> bytes

Read file content as bytes from this execution environment.

Parameters:

Name Type Description Default
path str

File path within this execution environment (relative or absolute within the env's working directory).

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
FileNotFoundError

If file does not exist.

RuntimeError

If execution environment not started.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def read_file_bytes(self, path: str) -> bytes:
    """Read file content as bytes from this execution environment.

    Args:
        path: File path within this execution environment (relative or absolute
              within the env's working directory).

    Returns:
        File contents as bytes.

    Raises:
        FileNotFoundError: If file does not exist.
        RuntimeError: If execution environment not started.

    """
    ...

write_file_bytes abstractmethod async

write_file_bytes(path: str, content: bytes) -> None

Write bytes to a file in this execution environment.

Parameters:

Name Type Description Default
path str

Destination path within this execution environment.

required
content bytes

File contents to write.

required

Raises:

Type Description
RuntimeError

If execution environment not started.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def write_file_bytes(self, path: str, content: bytes) -> None:
    """Write bytes to a file in this execution environment.

    Args:
        path: Destination path within this execution environment.
        content: File contents to write.

    Raises:
        RuntimeError: If execution environment not started.

    """
    ...

save_output_files async

save_output_files(
    paths: list[str],
    output_dir: Path | str,
    dest_env: CodeExecToolProvider | None = None,
) -> SaveOutputFilesResult

Save files from this execution environment to a destination.

Parameters:

Name Type Description Default
paths list[str]

List of file paths in this execution environment to save.

required
output_dir Path | str

Directory path to save files to.

required
dest_env CodeExecToolProvider | None

If provided, output_dir is interpreted as a path within dest_env (cross-environment transfer). If None, output_dir is a local filesystem path.

None

Returns:

Type Description
SaveOutputFilesResult

SaveOutputFilesResult containing lists of saved files and any failures.

Source code in src/stirrup/tools/code_backends/base.py
async def save_output_files(
    self,
    paths: list[str],
    output_dir: Path | str,
    dest_env: "CodeExecToolProvider | None" = None,
) -> SaveOutputFilesResult:
    """Save files from this execution environment to a destination.

    Args:
        paths: List of file paths in this execution environment to save.
        output_dir: Directory path to save files to.
        dest_env: If provided, output_dir is interpreted as a path within dest_env
                  (cross-environment transfer). If None, output_dir is a local
                  filesystem path.

    Returns:
        SaveOutputFilesResult containing lists of saved files and any failures.

    """
    result = SaveOutputFilesResult()
    output_dir_str = str(output_dir)

    for source_path in paths:
        try:
            content = await self.read_file_bytes(source_path)
            filename = Path(source_path).name
            dest_path = f"{output_dir_str}/{filename}"

            if dest_env:
                # Transfer to another exec env (cross-environment)
                logger.debug(
                    "CROSS-ENV TRANSFER: %s (%d bytes) -> %s (dest_env: %s)",
                    source_path,
                    len(content),
                    dest_path,
                    type(dest_env).__name__,
                )
                await dest_env.write_file_bytes(dest_path, content)
                result.saved.append(SavedFile(source_path, Path(dest_path), len(content)))
            else:
                # Save to local filesystem
                output_path = Path(output_dir) / filename
                output_path.parent.mkdir(parents=True, exist_ok=True)
                logger.debug(
                    "SAVE TO LOCAL: %s (%d bytes) -> %s",
                    source_path,
                    len(content),
                    output_path,
                )
                output_path.write_bytes(content)
                result.saved.append(SavedFile(source_path, output_path, len(content)))
        except Exception as e:
            logger.debug("TRANSFER FAILED: %s -> %s: %s", source_path, output_dir_str, e)
            result.failed[source_path] = str(e)

    return result

upload_files async

upload_files(
    *paths: Path | str,
    source_env: CodeExecToolProvider | None = None,
    dest_dir: str | None = None,
) -> UploadFilesResult

Upload files to this execution environment.

Parameters:

Name Type Description Default
*paths Path | str

File or directory paths to upload. If source_env is None, these are local filesystem paths. If source_env is provided, these are paths within source_env (cross-environment transfer).

()
source_env CodeExecToolProvider | None

If provided, paths are within source_env. If None, paths are local filesystem paths.

None
dest_dir str | None

Destination directory in this environment. If None, uses the environment's working directory.

None

Returns:

Type Description
UploadFilesResult

UploadFilesResult containing lists of uploaded files and any failures.

Raises:

Type Description
RuntimeError

If execution environment not started.

Source code in src/stirrup/tools/code_backends/base.py
async def upload_files(
    self,
    *paths: Path | str,
    source_env: "CodeExecToolProvider | None" = None,
    dest_dir: str | None = None,
) -> UploadFilesResult:
    """Upload files to this execution environment.

    Args:
        *paths: File or directory paths to upload. If source_env is None, these
                are local filesystem paths. If source_env is provided, these are
                paths within source_env (cross-environment transfer).
        source_env: If provided, paths are within source_env. If None, paths are
                    local filesystem paths.
        dest_dir: Destination directory in this environment.
                  If None, uses the environment's working directory.

    Returns:
        UploadFilesResult containing lists of uploaded files and any failures.

    Raises:
        RuntimeError: If execution environment not started.

    """
    result = UploadFilesResult()
    dest_dir_str = dest_dir or ""

    for path in paths:
        path_str = str(path)
        try:
            if source_env:
                # Cross-environment transfer: read from source_env
                content = await source_env.read_file_bytes(path_str)
                filename = Path(path_str).name
                dest_path = f"{dest_dir_str}/{filename}" if dest_dir_str else filename
                logger.debug(
                    "UPLOAD CROSS-ENV: %s (%d bytes) from %s -> %s",
                    path_str,
                    len(content),
                    type(source_env).__name__,
                    dest_path,
                )
                await self.write_file_bytes(dest_path, content)
                result.uploaded.append(UploadedFile(Path(path_str), dest_path, len(content)))
            else:
                # Local filesystem upload - must be handled by subclass
                # This is a fallback that reads from local fs and writes to env
                local_path = Path(path)
                if local_path.is_dir():
                    # Handle directory recursively
                    for file_path in local_path.rglob("*"):
                        if file_path.is_file():
                            rel_path = file_path.relative_to(local_path)
                            dest_path = f"{dest_dir_str}/{rel_path}" if dest_dir_str else str(rel_path)
                            content = file_path.read_bytes()
                            logger.debug(
                                "UPLOAD FROM LOCAL: %s (%d bytes) -> %s",
                                file_path,
                                len(content),
                                dest_path,
                            )
                            await self.write_file_bytes(dest_path, content)
                            result.uploaded.append(UploadedFile(file_path, dest_path, len(content)))
                else:
                    filename = local_path.name
                    dest_path = f"{dest_dir_str}/{filename}" if dest_dir_str else filename
                    content = local_path.read_bytes()
                    logger.debug(
                        "UPLOAD FROM LOCAL: %s (%d bytes) -> %s",
                        local_path,
                        len(content),
                        dest_path,
                    )
                    await self.write_file_bytes(dest_path, content)
                    result.uploaded.append(UploadedFile(local_path, dest_path, len(content)))
        except Exception as e:
            logger.debug("UPLOAD FAILED: %s -> %s: %s", path_str, dest_dir_str, e)
            result.failed[path_str] = str(e)

    return result

get_code_exec_tool

get_code_exec_tool(
    *,
    name: str = "code_exec",
    description: str | None = None,
) -> Tool[CodeExecutionParams, ToolUseCountMetadata]

Create a code execution tool for this environment.

Parameters:

Name Type Description Default
name str

Tool name

'code_exec'
description str | None

Tool description

None

Returns:

Type Description
Tool[CodeExecutionParams, ToolUseCountMetadata]

Tool[CodeExecutionParams] that executes commands in this environment

Source code in src/stirrup/tools/code_backends/base.py
def get_code_exec_tool(
    self,
    *,
    name: str = "code_exec",
    description: str | None = None,
) -> Tool[CodeExecutionParams, ToolUseCountMetadata]:
    """Create a code execution tool for this environment.

    Args:
        name: Tool name
        description: Tool description

    Returns:
        Tool[CodeExecutionParams] that executes commands in this environment

    """
    env = self

    async def executor(params: CodeExecutionParams) -> ToolResult[ToolUseCountMetadata]:
        result = await env.run_command(params.cmd)
        return format_result(result)

    return Tool[CodeExecutionParams, ToolUseCountMetadata](
        name=name,
        description=description
        or "Execute a shell command in the execution environment. Returns exit code, stdout, and stderr as XML.",
        parameters=CodeExecutionParams,
        executor=executor,  # ty: ignore[invalid-argument-type]
    )

get_view_image_tool

get_view_image_tool(
    *,
    name: str = "view_image",
    description: str | None = None,
) -> Tool[ViewImageParams, ToolUseCountMetadata]

Create a view_image tool for this environment.

Parameters:

Name Type Description Default
name str

Tool name

'view_image'
description str | None

Tool description

None

Returns:

Type Description
Tool[ViewImageParams, ToolUseCountMetadata]

Tool[ViewImageParams, ToolUseCountMetadata] that views images in this environment

Source code in src/stirrup/tools/code_backends/base.py
def get_view_image_tool(
    self,
    *,
    name: str = "view_image",
    description: str | None = None,
) -> Tool[ViewImageParams, ToolUseCountMetadata]:
    """Create a view_image tool for this environment.

    Args:
        name: Tool name
        description: Tool description

    Returns:
        Tool[ViewImageParams, ToolUseCountMetadata] that views images in this environment

    """
    env = self

    async def executor(params: ViewImageParams) -> ToolResult[ToolUseCountMetadata]:
        try:
            image = await env.view_image(params.path)
            return ToolResult(
                content=["Viewing image at path: " + params.path, image],
                metadata=ToolUseCountMetadata(),
            )
        except FileNotFoundError:
            return ToolResult(
                content=f"Image `{params.path}` not found.",
                metadata=ToolUseCountMetadata(),
            )
        except ValueError as e:
            return ToolResult(
                content=str(e),
                metadata=ToolUseCountMetadata(),
            )

    return Tool[ViewImageParams, ToolUseCountMetadata](
        name=name,
        description=description or "View an image file from the execution environment's filesystem.",
        parameters=ViewImageParams,
        executor=executor,  # ty: ignore[invalid-argument-type]
    )

view_image abstractmethod async

view_image(path: str) -> ImageContentBlock

Read and return an image file from the execution environment.

Parameters:

Name Type Description Default
path str

Path to image file in the execution environment (relative or absolute).

required

Returns:

Type Description
ImageContentBlock

ImageContentBlock containing the image data.

Raises:

Type Description
RuntimeError

If execution environment not started.

FileNotFoundError

If file does not exist.

ValueError

If path is outside the execution environment, is a directory, or the file is not a valid image.

Source code in src/stirrup/tools/code_backends/base.py
@abstractmethod
async def view_image(self, path: str) -> ImageContentBlock:
    """Read and return an image file from the execution environment.

    Args:
        path: Path to image file in the execution environment (relative or absolute).

    Returns:
        ImageContentBlock containing the image data.

    Raises:
        RuntimeError: If execution environment not started.
        FileNotFoundError: If file does not exist.
        ValueError: If path is outside the execution environment, is a directory,
                    or the file is not a valid image.

    """
    ...

LocalCodeExecToolProvider

LocalCodeExecToolProvider(
    *,
    allowed_commands: list[str] | None = None,
    temp_base_dir: Path | str | None = None,
    description: str | None = None,
)

Bases: CodeExecToolProvider

Local code execution tool provider using an isolated temp directory.

Commands are executed with the temp directory as the working directory. An optional allowlist can restrict which commands are permitted.

Usage with Agent

from stirrup.clients.chat_completions_client import ChatCompletionsClient

client = ChatCompletionsClient(model="gpt-5") agent = Agent( client=client, name="assistant", tools=[LocalCodeExecToolProvider(), CALCULATOR_TOOL], )

async with agent.session(output_dir="./output") as session: await session.run("Run some Python code")

Standalone usage

provider = LocalCodeExecToolProvider()

async with provider as tool: # tool is a Tool instance for code execution result = await provider.run_command("python script.py") await provider.save_output_files(["output.txt"], "/path/to/output")

Initialize LocalCodeExecToolProvider configuration.

Parameters:

Name Type Description Default
allowed_commands list[str] | None

Optional list of regex patterns. If provided, only commands matching at least one pattern are allowed. If None, all commands are allowed.

None
temp_base_dir Path | str | None

Optional base directory for creating the execution environment temp directory. If None, uses the system default temp directory.

None
description str | None

Optional description of the tool. If None, uses the default description.

None
Source code in src/stirrup/tools/code_backends/local.py
def __init__(
    self,
    *,
    allowed_commands: list[str] | None = None,
    temp_base_dir: Path | str | None = None,
    description: str | None = None,
) -> None:
    """Initialize LocalCodeExecToolProvider configuration.

    Args:
        allowed_commands: Optional list of regex patterns. If provided, only
                         commands matching at least one pattern are allowed.
                         If None, all commands are allowed.
        temp_base_dir: Optional base directory for creating the execution environment
                      temp directory. If None, uses the system default temp directory.
        description: Optional description of the tool. If None, uses the default description.

    """
    super().__init__(allowed_commands=allowed_commands)
    self._temp_dir: Path | None = None
    self._temp_base_dir: Path | None = Path(temp_base_dir) if temp_base_dir else None
    self._description = (
        description
        or "Execute a shell command in the execution environment. Returns exit code, stdout, and stderr as XML. Use `uv` to manage packages."
    )

temp_dir property

temp_dir: Path | None

Return the temp directory path, or None if not started.

__aenter__ async

Create temp directory and return the code_exec tool.

Source code in src/stirrup/tools/code_backends/local.py
async def __aenter__(self) -> "Tool[CodeExecutionParams, ToolUseCountMetadata]":
    """Create temp directory and return the code_exec tool."""
    if self._temp_base_dir:
        self._temp_base_dir.mkdir(parents=True, exist_ok=True)
    self._temp_dir = Path(tempfile.mkdtemp(prefix="local_exec_env_", dir=self._temp_base_dir))
    logger.info("Created local execution environment temp directory: %s", self._temp_dir)
    return self.get_code_exec_tool(description=self._description)

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Cleanup the local execution environment.

Source code in src/stirrup/tools/code_backends/local.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Cleanup the local execution environment."""
    if self._temp_dir and self._temp_dir.exists():
        try:
            shutil.rmtree(self._temp_dir)
        except Exception as exc:
            logger.warning("Failed to cleanup temp directory %s: %s", self._temp_dir, exc)
    self._temp_dir = None

read_file_bytes async

read_file_bytes(path: str) -> bytes

Read file content as bytes from the temp directory.

Parameters:

Name Type Description Default
path str

File path (relative or absolute within the temp dir).

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
RuntimeError

If environment not started.

ValueError

If path is outside temp directory.

FileNotFoundError

If file does not exist.

Source code in src/stirrup/tools/code_backends/local.py
async def read_file_bytes(self, path: str) -> bytes:
    """Read file content as bytes from the temp directory.

    Args:
        path: File path (relative or absolute within the temp dir).

    Returns:
        File contents as bytes.

    Raises:
        RuntimeError: If environment not started.
        ValueError: If path is outside temp directory.
        FileNotFoundError: If file does not exist.

    """
    resolved = self._resolve_and_validate_path(path)
    if not resolved.exists():
        raise FileNotFoundError(f"File not found: {path}")
    return resolved.read_bytes()

write_file_bytes async

write_file_bytes(path: str, content: bytes) -> None

Write bytes to a file in the temp directory.

Parameters:

Name Type Description Default
path str

Destination path (relative or absolute within the temp dir).

required
content bytes

File contents to write.

required

Raises:

Type Description
RuntimeError

If environment not started.

ValueError

If path is outside temp directory.

Source code in src/stirrup/tools/code_backends/local.py
async def write_file_bytes(self, path: str, content: bytes) -> None:
    """Write bytes to a file in the temp directory.

    Args:
        path: Destination path (relative or absolute within the temp dir).
        content: File contents to write.

    Raises:
        RuntimeError: If environment not started.
        ValueError: If path is outside temp directory.

    """
    resolved = self._resolve_and_validate_path(path)
    resolved.parent.mkdir(parents=True, exist_ok=True)
    resolved.write_bytes(content)

run_command async

run_command(
    cmd: str, *, timeout: int = SHELL_TIMEOUT
) -> CommandResult

Execute command in the temp directory.

Parameters:

Name Type Description Default
cmd str

Shell command to execute (bash syntax).

required
timeout int

Maximum time in seconds to wait for command completion.

SHELL_TIMEOUT

Returns:

Type Description
CommandResult

CommandResult with exit_code, stdout, stderr, and optional error info.

Source code in src/stirrup/tools/code_backends/local.py
async def run_command(self, cmd: str, *, timeout: int = SHELL_TIMEOUT) -> CommandResult:
    """Execute command in the temp directory.

    Args:
        cmd: Shell command to execute (bash syntax).
        timeout: Maximum time in seconds to wait for command completion.

    Returns:
        CommandResult with exit_code, stdout, stderr, and optional error info.

    """
    if self._temp_dir is None:
        raise RuntimeError(
            "ExecutionEnvironment not started. Ensure current Agent is equipped with a CodeExecToolProvider."
        )

    # Check allowlist
    if not self._check_allowed(cmd):
        return CommandResult(
            exit_code=1,
            stdout="",
            stderr=f"Command not allowed: '{cmd}' does not match any allowed patterns",
            error_kind="command_not_allowed",
            advice="Only commands matching the allowlist patterns are permitted.",
        )

    # Check for absolute paths (local environment is not sandboxed)
    absolute_path_error = self._check_absolute_paths(cmd)
    if absolute_path_error:
        return absolute_path_error

    process = None
    try:
        with anyio.fail_after(timeout):
            # Use shell=True by wrapping in a shell command
            process = await anyio.open_process(
                ["bash", "-c", cmd],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                cwd=self._temp_dir,
            )

            # Read all output from streams concurrently
            stdout_chunks: list[bytes] = []
            stderr_chunks: list[bytes] = []

            async def read_stdout() -> None:
                if process.stdout:
                    stdout_chunks.extend([chunk async for chunk in process.stdout])

            async def read_stderr() -> None:
                if process.stderr:
                    stderr_chunks.extend([chunk async for chunk in process.stderr])

            async with anyio.create_task_group() as tg:
                tg.start_soon(read_stdout)
                tg.start_soon(read_stderr)

            await process.wait()

            return CommandResult(
                exit_code=process.returncode or 0,
                stdout=b"".join(stdout_chunks).decode("utf-8", errors="replace"),
                stderr=b"".join(stderr_chunks).decode("utf-8", errors="replace"),
            )

    except TimeoutError:
        if process:
            process.kill()
        return CommandResult(
            exit_code=1,
            stdout="",
            stderr=f"Command timed out after {timeout} seconds",
            error_kind="timeout",
        )
    except Exception as exc:
        return CommandResult(
            exit_code=1,
            stdout="",
            stderr=str(exc),
            error_kind="execution_error",
        )

save_output_files async

save_output_files(
    paths: list[str],
    output_dir: Path | str,
    dest_env: CodeExecToolProvider | None = None,
) -> SaveOutputFilesResult

Move files from the temp directory to a destination.

When dest_env is None (local filesystem), files are MOVED (not copied) - originals are deleted from the execution environment. Existing files in output_dir are silently overwritten.

When dest_env is provided (cross-environment transfer), files are copied using the base class implementation via read/write primitives.

Parameters:

Name Type Description Default
paths list[str]

List of file paths in the execution environment (relative or absolute). Relative paths are resolved against the execution environment temp directory.

required
output_dir Path | str

Directory path to save files to.

required
dest_env CodeExecToolProvider | None

If provided, output_dir is interpreted as a path within dest_env (cross-environment transfer). If None, output_dir is a local filesystem path.

None

Returns:

Type Description
SaveOutputFilesResult

SaveOutputFilesResult containing lists of saved files and any failures.

Source code in src/stirrup/tools/code_backends/local.py
async def save_output_files(
    self,
    paths: list[str],
    output_dir: Path | str,
    dest_env: "CodeExecToolProvider | None" = None,
) -> SaveOutputFilesResult:
    """Move files from the temp directory to a destination.

    When dest_env is None (local filesystem), files are MOVED (not copied) -
    originals are deleted from the execution environment.
    Existing files in output_dir are silently overwritten.

    When dest_env is provided (cross-environment transfer), files are copied
    using the base class implementation via read/write primitives.

    Args:
        paths: List of file paths in the execution environment (relative or absolute).
               Relative paths are resolved against the execution environment temp directory.
        output_dir: Directory path to save files to.
        dest_env: If provided, output_dir is interpreted as a path within dest_env
                  (cross-environment transfer). If None, output_dir is a local
                  filesystem path.

    Returns:
        SaveOutputFilesResult containing lists of saved files and any failures.

    """
    if self._temp_dir is None:
        raise RuntimeError(
            "ExecutionEnvironment not started. Ensure current Agent is equipped with a CodeExecToolProvider."
        )

    # If dest_env is provided, use the base class implementation (cross-env transfer)
    if dest_env is not None:
        return await super().save_output_files(paths, output_dir, dest_env)

    # Local filesystem - use optimized move operation
    output_dir_path = Path(output_dir)
    output_dir_path.mkdir(parents=True, exist_ok=True)

    result = SaveOutputFilesResult()

    for source_path_str in paths:
        try:
            source_path = Path(source_path_str)
            if not source_path.is_absolute():
                source_path = self._temp_dir / source_path

            # Security: ensure path is within temp directory
            try:
                source_path.resolve().relative_to(self._temp_dir.resolve())
            except ValueError:
                result.failed[source_path_str] = "Path is outside execution environment directory"
                logger.warning("Attempted to access path outside execution environment: %s", source_path_str)
                continue

            if not source_path.exists():
                result.failed[source_path_str] = "File does not exist"
                logger.warning("Execution environment file does not exist: %s", source_path_str)
                continue

            if not source_path.is_file():
                result.failed[source_path_str] = "Path is not a file"
                logger.warning("Execution environment path is not a file: %s", source_path_str)
                continue

            file_size = source_path.stat().st_size
            dest_path = output_dir_path / source_path.name

            # Move file (overwrites if exists)
            shutil.move(str(source_path), str(dest_path))
            logger.info("Moved file: %s -> %s", source_path, dest_path)

            result.saved.append(
                SavedFile(
                    source_path=source_path_str,
                    output_path=dest_path,
                    size=file_size,
                ),
            )

        except Exception as exc:
            result.failed[source_path_str] = str(exc)
            logger.exception("Failed to move file: %s", source_path_str)

    return result

upload_files async

upload_files(
    *paths: Path | str,
    source_env: CodeExecToolProvider | None = None,
    dest_dir: str | None = None,
) -> UploadFilesResult

Upload files to the execution environment.

When source_env is None (local filesystem), files are COPIED (not moved) - originals remain on the local filesystem. Directories are uploaded recursively, preserving their structure.

When source_env is provided (cross-environment transfer), files are copied using the base class implementation via read/write primitives.

Parameters:

Name Type Description Default
*paths Path | str

File or directory paths to upload. If source_env is None, these are local filesystem paths. If source_env is provided, these are paths within source_env.

()
source_env CodeExecToolProvider | None

If provided, paths are within source_env. If None, paths are local filesystem paths.

None
dest_dir str | None

Destination subdirectory within the temp directory. If None, files are placed directly in the temp directory.

None

Returns:

Type Description
UploadFilesResult

UploadFilesResult containing lists of uploaded files and any failures.

Source code in src/stirrup/tools/code_backends/local.py
async def upload_files(
    self,
    *paths: Path | str,
    source_env: "CodeExecToolProvider | None" = None,
    dest_dir: str | None = None,
) -> UploadFilesResult:
    """Upload files to the execution environment.

    When source_env is None (local filesystem), files are COPIED (not moved) -
    originals remain on the local filesystem.
    Directories are uploaded recursively, preserving their structure.

    When source_env is provided (cross-environment transfer), files are copied
    using the base class implementation via read/write primitives.

    Args:
        *paths: File or directory paths to upload. If source_env is None, these
                are local filesystem paths. If source_env is provided, these are
                paths within source_env.
        source_env: If provided, paths are within source_env. If None, paths are
                    local filesystem paths.
        dest_dir: Destination subdirectory within the temp directory.
                  If None, files are placed directly in the temp directory.

    Returns:
        UploadFilesResult containing lists of uploaded files and any failures.

    """
    if self._temp_dir is None:
        raise RuntimeError(
            "ExecutionEnvironment not started. Ensure current Agent is equipped with a CodeExecToolProvider."
        )

    # If source_env is provided, use the base class implementation (cross-env transfer)
    if source_env is not None:
        return await super().upload_files(*paths, source_env=source_env, dest_dir=dest_dir)

    # Local filesystem - use optimized copy operation
    dest_base = self._temp_dir / dest_dir if dest_dir else self._temp_dir
    dest_base.mkdir(parents=True, exist_ok=True)

    result = UploadFilesResult()

    for source in paths:
        source = Path(source).resolve()

        if not source.exists():
            result.failed[str(source)] = "File or directory does not exist"
            logger.warning("Upload source does not exist: %s", source)
            continue

        try:
            if source.is_file():
                dest = dest_base / source.name
                shutil.copy2(source, dest)
                result.uploaded.append(
                    UploadedFile(
                        source_path=source,
                        dest_path=str(dest.relative_to(self._temp_dir)),
                        size=source.stat().st_size,
                    ),
                )
                logger.debug("Uploaded file: %s -> %s", source, dest)

            elif source.is_dir():
                dest = dest_base / source.name
                shutil.copytree(source, dest, dirs_exist_ok=True)
                # Track all individual files uploaded
                for file_path in source.rglob("*"):
                    if file_path.is_file():
                        relative = file_path.relative_to(source)
                        dest_file = dest / relative
                        result.uploaded.append(
                            UploadedFile(
                                source_path=file_path,
                                dest_path=str(dest_file.relative_to(self._temp_dir)),
                                size=file_path.stat().st_size,
                            ),
                        )
                logger.debug("Uploaded directory: %s -> %s", source, dest)

        except Exception as exc:
            result.failed[str(source)] = str(exc)
            logger.exception("Failed to upload: %s", source)

    return result

view_image async

view_image(path: str) -> ImageContentBlock

Read and return an image file from the local execution environment.

Parameters:

Name Type Description Default
path str

Path to image file (relative to temp directory, or absolute within it).

required

Returns:

Type Description
ImageContentBlock

ImageContentBlock containing the image data.

Raises:

Type Description
RuntimeError

If execution environment not started.

FileNotFoundError

If file does not exist.

ValueError

If path is outside temp directory, is a directory, or not a valid image.

Source code in src/stirrup/tools/code_backends/local.py
async def view_image(self, path: str) -> ImageContentBlock:
    """Read and return an image file from the local execution environment.

    Args:
        path: Path to image file (relative to temp directory, or absolute within it).

    Returns:
        ImageContentBlock containing the image data.

    Raises:
        RuntimeError: If execution environment not started.
        FileNotFoundError: If file does not exist.
        ValueError: If path is outside temp directory, is a directory, or not a valid image.

    """
    file_bytes = await self.read_file_bytes(path)
    return ImageContentBlock(data=file_bytes)

AgentLogger

AgentLogger(
    *, show_spinner: bool = True, level: int = INFO
)

Bases: AgentLoggerBase

Rich console logger for agent workflows.

Implements AgentLoggerBase with rich formatting, spinners, and visual hierarchy. Each agent (including sub-agents) should have its own logger instance.

Usage

from stirrup.clients.chat_completions_client import ChatCompletionsClient

Agent creates logger internally by default

client = ChatCompletionsClient(model="gpt-4") agent = Agent(client=client, name="assistant")

Or pass a pre-configured logger

logger = AgentLogger(show_spinner=False) agent = Agent(client=client, name="assistant", logger=logger)

Agent sets these properties before calling enter:

logger.name, logger.model, logger.max_turns, logger.depth

Agent sets these before calling exit:

logger.finish_params, logger.run_metadata, logger.output_dir

Initialize the agent logger.

Parameters:

Name Type Description Default
show_spinner bool

Whether to show a spinner while agent runs (only for depth=0)

True
level int

Logging level (default: INFO)

INFO
Source code in src/stirrup/utils/logging.py
def __init__(
    self,
    *,
    show_spinner: bool = True,
    level: int = logging.INFO,
) -> None:
    """Initialize the agent logger.

    Args:
        show_spinner: Whether to show a spinner while agent runs (only for depth=0)
        level: Logging level (default: INFO)
    """
    # Properties set by Agent before __enter__
    self.name: str = "agent"
    self.model: str | None = None
    self.max_turns: int | None = None
    self.depth: int = 0

    # State set by Agent before __exit__
    self.finish_params: BaseModel | None = None
    self.run_metadata: dict[str, list[Any]] | None = None
    self.output_dir: str | None = None

    # Configuration
    self._show_spinner = show_spinner
    self._level = level

    # Spinner state (only used when depth == 0 and show_spinner is True)
    self._current_step = 0
    self._tool_calls = 0
    self._input_tokens = 0
    self._output_tokens = 0
    self._live: Live | None = None

    # Configure rich logging on first logger creation
    self._configure_logging()

__enter__

__enter__() -> Self

Enter logging context. Logs agent start and starts spinner if depth=0.

Source code in src/stirrup/utils/logging.py
def __enter__(self) -> Self:
    """Enter logging context. Logs agent start and starts spinner if depth=0."""
    # Log agent start (rule + system prompt display)
    indent_spaces = self.depth * SUBAGENT_INDENT_SPACES

    # Build title with optional model info
    model_str = f" ({self.model})" if self.model else ""
    if self.depth == 0:
        title = f"▶ {self.name}{model_str}"
        console.rule(f"[bold cyan]{title}[/]", style="cyan")
    else:
        title = f"▶ {self.name}: Level {self.depth}{model_str}"
        rule = Rule(f"[bold cyan]{title}[/]", style="cyan")
        self._print_indented(rule, indent_spaces)
    console.print()

    # Start spinner only for top-level agent
    if self.depth == 0 and self._show_spinner:
        self._live = Live(self._make_spinner(), console=console, refresh_per_second=10)
        self._live.start()

    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Exit logging context. Stops spinner and logs completion stats.

Source code in src/stirrup/utils/logging.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit logging context. Stops spinner and logs completion stats."""
    # Stop spinner first
    if self._live:
        self._live.stop()
        self._live = None

    error = str(exc_val) if exc_type is not None else None
    self._log_finish(error=error)

on_step

on_step(
    step: int,
    tool_calls: int = 0,
    input_tokens: int = 0,
    output_tokens: int = 0,
) -> None

Report step progress and stats during agent execution.

Source code in src/stirrup/utils/logging.py
def on_step(
    self,
    step: int,
    tool_calls: int = 0,
    input_tokens: int = 0,
    output_tokens: int = 0,
) -> None:
    """Report step progress and stats during agent execution."""
    self._current_step = step
    self._tool_calls = tool_calls
    self._input_tokens = input_tokens
    self._output_tokens = output_tokens
    if self._live:
        self._live.update(self._make_spinner())

set_level

set_level(level: int) -> None

Set the logging level.

Source code in src/stirrup/utils/logging.py
def set_level(self, level: int) -> None:
    """Set the logging level."""
    self._level = level
    # Also update root logger level
    logging.getLogger().setLevel(level)

is_enabled_for

is_enabled_for(level: int) -> bool

Check if a given log level is enabled.

Source code in src/stirrup/utils/logging.py
def is_enabled_for(self, level: int) -> bool:
    """Check if a given log level is enabled."""
    return level >= self._level

debug

debug(message: str, *args: object) -> None

Log a debug message (dim style).

Source code in src/stirrup/utils/logging.py
def debug(self, message: str, *args: object) -> None:
    """Log a debug message (dim style)."""
    if self._level <= logging.DEBUG:
        formatted = message % args if args else message
        console.print(f"[dim]{formatted}[/]")

info

info(message: str, *args: object) -> None

Log an info message.

Source code in src/stirrup/utils/logging.py
def info(self, message: str, *args: object) -> None:
    """Log an info message."""
    if self._level <= logging.INFO:
        formatted = message % args if args else message
        console.print(formatted)

warning

warning(message: str, *args: object) -> None

Log a warning message (yellow style).

Source code in src/stirrup/utils/logging.py
def warning(self, message: str, *args: object) -> None:
    """Log a warning message (yellow style)."""
    if self._level <= logging.WARNING:
        formatted = message % args if args else message
        console.print(f"[yellow]⚠ {formatted}[/]")

error

error(message: str, *args: object) -> None

Log an error message (red style).

Source code in src/stirrup/utils/logging.py
def error(self, message: str, *args: object) -> None:
    """Log an error message (red style)."""
    if self._level <= logging.ERROR:
        formatted = message % args if args else message
        console.print(f"[red]✗ {formatted}[/]")

critical

critical(message: str, *args: object) -> None

Log a critical message (bold red style).

Source code in src/stirrup/utils/logging.py
def critical(self, message: str, *args: object) -> None:
    """Log a critical message (bold red style)."""
    if self._level <= logging.CRITICAL:
        formatted = message % args if args else message
        console.print(f"[bold red]✗ CRITICAL: {formatted}[/]")

exception

exception(message: str, *args: object) -> None

Log an error message with exception traceback (red style with traceback).

Source code in src/stirrup/utils/logging.py
def exception(self, message: str, *args: object) -> None:
    """Log an error message with exception traceback (red style with traceback)."""
    if self._level <= logging.ERROR:
        formatted = message % args if args else message
        console.print(f"[red]✗ {formatted}[/]")
        console.print_exception()

assistant_message

assistant_message(
    turn: int,
    max_turns: int,
    assistant_message: AssistantMessage,
) -> None

Log an assistant message with content and tool calls in a panel.

Parameters:

Name Type Description Default
turn int

Current turn number (1-indexed)

required
max_turns int

Maximum number of turns

required
assistant_message AssistantMessage

The assistant's response message

required
Source code in src/stirrup/utils/logging.py
def assistant_message(
    self,
    turn: int,
    max_turns: int,
    assistant_message: AssistantMessage,
) -> None:
    """Log an assistant message with content and tool calls in a panel.

    Args:
        turn: Current turn number (1-indexed)
        max_turns: Maximum number of turns
        assistant_message: The assistant's response message
    """
    if self._level > logging.INFO:
        return

    # Build panel content
    content = Text()

    # Add assistant content if present
    if assistant_message.content:
        text = assistant_message.content
        if isinstance(text, list):
            text = "\n".join(str(block) for block in text)
        # Truncate long content
        if len(text) > 500:
            text = text[:500] + "..."
        content.append(text, style="white")

    # Add tool calls if present
    if assistant_message.tool_calls:
        if assistant_message.content:
            content.append("\n\n")
        content.append("Tool Calls:\n", style="bold magenta")
        for tc in assistant_message.tool_calls:
            args_parsed = json.loads(tc.arguments)
            args_formatted = json.dumps(args_parsed, indent=2, ensure_ascii=False)
            args_preview = args_formatted[:1000] + "..." if len(args_formatted) > 1000 else args_formatted
            content.append(f"  🔧 {tc.name}", style="magenta")
            content.append(args_preview, style="dim")

    # Create and print panel with agent name in title
    title = f"[bold]AssistantMessage[/bold] │ {self.name} │ Turn {turn}/{max_turns}"
    panel = Panel(content, title=title, title_align="left", border_style="yellow", padding=(0, 1))

    if self.depth > 0:
        self._print_indented(panel, self.depth * SUBAGENT_INDENT_SPACES)
    else:
        console.print(panel)

user_message

user_message(user_message: UserMessage) -> None

Log a user message in a panel.

Parameters:

Name Type Description Default
user_message UserMessage

The user's message

required
Source code in src/stirrup/utils/logging.py
def user_message(self, user_message: UserMessage) -> None:
    """Log a user message in a panel.

    Args:
        user_message: The user's message
    """
    if self._level > logging.INFO:
        return

    # Build panel content
    content = Text()

    # Add user content
    if user_message.content:
        text = user_message.content
        if isinstance(text, list):
            text = "\n".join(str(block) for block in text)
        # Truncate long content
        if len(text) > 500:
            text = text[:500] + "..."
        content.append(text, style="white")

    # Create and print panel with agent name in title
    title = f"[bold]UserMessage[/bold] │ {self.name}"
    panel = Panel(content, title=title, title_align="left", border_style="blue", padding=(0, 1))

    if self.depth > 0:
        self._print_indented(panel, self.depth * SUBAGENT_INDENT_SPACES)
    else:
        console.print(panel)

task_message

task_message(task: str | list[Any]) -> None

Log the initial task/prompt at the start of a run.

Source code in src/stirrup/utils/logging.py
def task_message(self, task: str | list[Any]) -> None:
    """Log the initial task/prompt at the start of a run."""
    if self._level > logging.INFO:
        return

    # Convert list content to string
    if isinstance(task, list):
        task = "\n".join(str(block) for block in task)

    # Clean up whitespace from multi-line strings
    # Normalize each line by stripping leading/trailing whitespace and rejoining
    lines = [line.strip() for line in task.split("\n")]
    task = " ".join(line for line in lines if line)

    # Use "Sub Agent" prefix for nested agents
    prefix = "Sub Agent" if self.depth > 0 else "Agent"

    if self.depth > 0:
        indent = " " * (self.depth * SUBAGENT_INDENT_SPACES)
        console.print(f"{indent}[bold]{prefix} Task:[/bold]")
        console.print()
        for line in task.split("\n"):
            console.print(f"{indent}{line}")
    else:
        console.print(f"[bold]{prefix} Task:[/bold]")
        console.print()
        console.print(task)

    console.print()  # Add gap after task section

warnings_message

warnings_message(warnings: list[str]) -> None

Display warnings at run start as simple text.

Source code in src/stirrup/utils/logging.py
def warnings_message(self, warnings: list[str]) -> None:
    """Display warnings at run start as simple text."""
    if self._level > logging.INFO or not warnings:
        return

    console.print("[bold orange1]Warnings[/bold orange1]")
    console.print()
    for warning in warnings:
        console.print(f"[orange1]⚠ {warning}[/orange1]")
        console.print()  # Add gap between warnings

tool_result

tool_result(tool_message: ToolMessage) -> None

Log a single tool execution result in a panel with XML syntax highlighting.

Parameters:

Name Type Description Default
tool_message ToolMessage

The tool execution result

required
Source code in src/stirrup/utils/logging.py
def tool_result(self, tool_message: ToolMessage) -> None:
    """Log a single tool execution result in a panel with XML syntax highlighting.

    Args:
        tool_message: The tool execution result
    """
    if self._level > logging.INFO:
        return

    tool_name = tool_message.name or "unknown"

    # Get result content
    result_text = tool_message.content
    if isinstance(result_text, list):
        result_text = "\n".join(str(block) for block in result_text)

    # Unescape HTML entities (e.g., &lt; -> <, &gt; -> >, &amp; -> &)
    result_text = html.unescape(result_text)

    # Truncate long results
    if len(result_text) > 1000:
        result_text = result_text[:1000] + "..."

    # Format as XML with syntax highlighting
    content = Syntax(result_text, "xml", theme="monokai", word_wrap=True)

    # Status indicator in title with agent name
    status = "✓" if tool_message.args_was_valid else "✗"
    status_style = "green" if tool_message.args_was_valid else "red"
    title = f"[{status_style}]{status}[/{status_style}] [bold]ToolResult[/bold] │ {self.name} │ [green]{tool_name}[/green]"

    panel = Panel(content, title=title, title_align="left", border_style="green", padding=(0, 1))

    if self.depth > 0:
        self._print_indented(panel, self.depth * SUBAGENT_INDENT_SPACES)
    else:
        console.print(panel)

context_summarization_start

context_summarization_start(
    pct_used: float, cutoff: float
) -> None

Log context window summarization starting in an orange panel.

Parameters:

Name Type Description Default
pct_used float

Percentage of context window currently used (0.0-1.0)

required
cutoff float

The threshold that triggered summarization (0.0-1.0)

required
Source code in src/stirrup/utils/logging.py
def context_summarization_start(self, pct_used: float, cutoff: float) -> None:
    """Log context window summarization starting in an orange panel.

    Args:
        pct_used: Percentage of context window currently used (0.0-1.0)
        cutoff: The threshold that triggered summarization (0.0-1.0)
    """
    # Build panel content
    content = Text()
    content.append("Context window limit reached\n\n", style="bold")
    content.append("Used: ", style="dim")
    content.append(f"{pct_used:.1%}", style="bold orange1")
    content.append("  │  ", style="dim")
    content.append("Threshold: ", style="dim")
    content.append(f"{cutoff:.1%}", style="bold")
    content.append("\n\n", style="dim")
    content.append("Summarizing conversation history...", style="italic")

    panel = Panel(
        content,
        title="[bold orange1]📝 Context Summarization[/]",
        title_align="left",
        border_style="orange1",
        padding=(0, 1),
    )

    if self.depth > 0:
        self._print_indented(panel, self.depth * SUBAGENT_INDENT_SPACES)
    else:
        console.print(panel)

context_summarization_complete

context_summarization_complete(
    summary: str, bridge: str
) -> None

Log the completed context summarization with summary content.

Parameters:

Name Type Description Default
summary str

The generated summary of the conversation

required
bridge str

The bridge message that will be used to continue the conversation

required
Source code in src/stirrup/utils/logging.py
def context_summarization_complete(self, summary: str, bridge: str) -> None:
    """Log the completed context summarization with summary content.

    Args:
        summary: The generated summary of the conversation
        bridge: The bridge message that will be used to continue the conversation
    """
    # Truncate long summaries for display
    summary_display = summary
    if len(summary_display) > 800:
        summary_display = summary_display[:800] + "..."

    # Build panel content
    content = Text()
    content.append("Summary:\n", style="bold")
    content.append(summary_display, style="white")

    if self._level > logging.INFO:
        bridge_display = bridge
        if len(bridge_display) > 200:
            bridge_display = bridge_display[:200] + "..."
        content.append("\n\n")
        content.append("Bridge Message:\n", style="bold dim")
        content.append(bridge_display, style="dim italic")

    panel = Panel(
        content,
        title="[bold green]✓ Summary Generated[/]",
        title_align="left",
        border_style="green",
        padding=(0, 1),
    )

    if self.depth > 0:
        self._print_indented(panel, self.depth * SUBAGENT_INDENT_SPACES)
    else:
        console.print(panel)

AgentLoggerBase

Bases: ABC

Abstract base class for agent loggers.

Defines the interface that Agent uses for logging. Implement this to create custom loggers (e.g., for testing, file output, or monitoring services).

Properties are set by Agent after construction: - name, model, max_turns, depth: Agent configuration - finish_params, run_metadata, output_dir: Set before exit for final stats

__enter__ abstractmethod

__enter__() -> Self

Enter logging context. Called when agent session starts.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def __enter__(self) -> Self:
    """Enter logging context. Called when agent session starts."""
    ...

__exit__ abstractmethod

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Exit logging context. Called when agent session ends.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit logging context. Called when agent session ends."""
    ...

on_step abstractmethod

on_step(
    step: int,
    tool_calls: int = 0,
    input_tokens: int = 0,
    output_tokens: int = 0,
) -> None

Report step progress and stats during agent execution.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def on_step(
    self,
    step: int,
    tool_calls: int = 0,
    input_tokens: int = 0,
    output_tokens: int = 0,
) -> None:
    """Report step progress and stats during agent execution."""
    ...

assistant_message abstractmethod

assistant_message(
    turn: int,
    max_turns: int,
    assistant_message: AssistantMessage,
) -> None

Log an assistant message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def assistant_message(
    self,
    turn: int,
    max_turns: int,
    assistant_message: AssistantMessage,
) -> None:
    """Log an assistant message."""
    ...

user_message abstractmethod

user_message(user_message: UserMessage) -> None

Log a user message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def user_message(self, user_message: UserMessage) -> None:
    """Log a user message."""
    ...

task_message abstractmethod

task_message(task: str | list[Any]) -> None

Log the initial task/prompt at the start of a run.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def task_message(self, task: str | list[Any]) -> None:
    """Log the initial task/prompt at the start of a run."""
    ...

tool_result abstractmethod

tool_result(tool_message: ToolMessage) -> None

Log a tool execution result.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def tool_result(self, tool_message: ToolMessage) -> None:
    """Log a tool execution result."""
    ...

context_summarization_start abstractmethod

context_summarization_start(
    pct_used: float, cutoff: float
) -> None

Log that context summarization is starting.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def context_summarization_start(self, pct_used: float, cutoff: float) -> None:
    """Log that context summarization is starting."""
    ...

context_summarization_complete abstractmethod

context_summarization_complete(
    summary: str, bridge: str
) -> None

Log completed context summarization.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def context_summarization_complete(self, summary: str, bridge: str) -> None:
    """Log completed context summarization."""
    ...

debug abstractmethod

debug(message: str, *args: object) -> None

Log a debug message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def debug(self, message: str, *args: object) -> None:
    """Log a debug message."""
    ...

info abstractmethod

info(message: str, *args: object) -> None

Log an info message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def info(self, message: str, *args: object) -> None:
    """Log an info message."""
    ...

warning abstractmethod

warning(message: str, *args: object) -> None

Log a warning message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def warning(self, message: str, *args: object) -> None:
    """Log a warning message."""
    ...

error abstractmethod

error(message: str, *args: object) -> None

Log an error message.

Source code in src/stirrup/utils/logging.py
@abstractmethod
def error(self, message: str, *args: object) -> None:
    """Log an error message."""
    ...

SessionState dataclass

SessionState(
    exit_stack: AsyncExitStack,
    exec_env: CodeExecToolProvider | None = None,
    output_dir: str | None = None,
    parent_exec_env: CodeExecToolProvider | None = None,
    depth: int = 0,
    uploaded_file_paths: list[str] = list(),
)

Per-session state for resource lifecycle management.

Kept minimal - only contains resources that need async lifecycle management (exit_stack, exec_env) and session-specific configuration (output_dir).

Tool availability is managed via Agent._active_tools (instance-scoped), and run results are stored on the agent instance temporarily.

For subagent file transfer: - parent_exec_env: Reference to the parent's exec env (for cross-env transfers) - depth: Agent depth (0 = root, >0 = subagent) - output_dir: For root agent, this is a local filesystem path. For subagents, this is a path within the parent's exec env.

SubAgentParams

Bases: BaseModel

Parameters for sub-agent tool invocation.

Agent

Agent(
    client: LLMClient,
    name: str,
    *,
    max_turns: int = AGENT_MAX_TURNS,
    system_prompt: str | None = None,
    tools: list[Tool | ToolProvider] | None = None,
    finish_tool: Tool[FinishParams, FinishMeta]
    | None = None,
    context_summarization_cutoff: float = CONTEXT_SUMMARIZATION_CUTOFF,
    run_sync_in_thread: bool = True,
    text_only_tool_responses: bool = True,
    logger: AgentLoggerBase | None = None,
)

Agent that executes tool-using loops with automatic context management.

Runs up to max_turns iterations of: LLM generation → tool execution → message accumulation. When conversation history exceeds context window limits, older messages are automatically condensed into a summary to preserve working memory.

The Agent can be used as an async context manager via .session() for automatic tool lifecycle management, logging, and file saving:

from stirrup.clients.chat_completions_client import ChatCompletionsClient

# Create client and agent
client = ChatCompletionsClient(model="gpt-5")
agent = Agent(client=client, name="assistant")

async with agent.session(output_dir="./output") as session:
    finish_params, history, metadata = await session.run("Your task here")

Initialize the agent with an LLM client and configuration.

Parameters:

Name Type Description Default
client LLMClient

LLM client for generating responses. Use ChatCompletionsClient for OpenAI/OpenAI-compatible APIs, or LiteLLMClient for other providers.

required
name str

Name of the agent (used for logging purposes)

required
max_turns int

Maximum number of turns before stopping

AGENT_MAX_TURNS
system_prompt str | None

System prompt to prepend to all runs (when using string prompts)

None
tools list[Tool | ToolProvider] | None

List of Tools and/or ToolProviders available to the agent. If None, uses DEFAULT_TOOLS. ToolProviders are automatically set up and torn down by Agent.session(). Use [*DEFAULT_TOOLS, extra_tool] to extend defaults.

None
finish_tool Tool[FinishParams, FinishMeta] | None

Tool used to signal task completion. Defaults to SIMPLE_FINISH_TOOL.

None
context_summarization_cutoff float

Fraction of context window (0-1) at which to trigger summarization

CONTEXT_SUMMARIZATION_CUTOFF
run_sync_in_thread bool

Execute synchronous tool executors in a separate thread

True
text_only_tool_responses bool

Extract images from tool responses as separate user messages

True
logger AgentLoggerBase | None

Optional logger instance. If None, creates AgentLogger() internally.

None
Source code in src/stirrup/core/agent.py
def __init__(
    self,
    client: LLMClient,
    name: str,
    *,
    max_turns: int = AGENT_MAX_TURNS,
    system_prompt: str | None = None,
    tools: list[Tool | ToolProvider] | None = None,
    finish_tool: Tool[FinishParams, FinishMeta] | None = None,
    # Agent options
    context_summarization_cutoff: float = CONTEXT_SUMMARIZATION_CUTOFF,
    run_sync_in_thread: bool = True,
    text_only_tool_responses: bool = True,
    # Logging
    logger: AgentLoggerBase | None = None,
) -> None:
    """Initialize the agent with an LLM client and configuration.

    Args:
        client: LLM client for generating responses. Use ChatCompletionsClient for
                OpenAI/OpenAI-compatible APIs, or LiteLLMClient for other providers.
        name: Name of the agent (used for logging purposes)
        max_turns: Maximum number of turns before stopping
        system_prompt: System prompt to prepend to all runs (when using string prompts)
        tools: List of Tools and/or ToolProviders available to the agent.
               If None, uses DEFAULT_TOOLS. ToolProviders are automatically
               set up and torn down by Agent.session().
               Use [*DEFAULT_TOOLS, extra_tool] to extend defaults.
        finish_tool: Tool used to signal task completion. Defaults to SIMPLE_FINISH_TOOL.
        context_summarization_cutoff: Fraction of context window (0-1) at which to trigger summarization
        run_sync_in_thread: Execute synchronous tool executors in a separate thread
        text_only_tool_responses: Extract images from tool responses as separate user messages
        logger: Optional logger instance. If None, creates AgentLogger() internally.

    """
    # Validate agent name
    if not AGENT_NAME_PATTERN.match(name):
        raise ValueError(
            f"Invalid agent name '{name}'. "
            "Agent names must match pattern '^[a-zA-Z0-9_-]{1,128}$' "
            "(alphanumeric, underscores, hyphens only, 1-128 characters)."
        )

    self._client: LLMClient = client
    self._name = name
    self._max_turns = max_turns
    self._system_prompt = system_prompt
    self._tools = tools if tools is not None else DEFAULT_TOOLS
    self._finish_tool: Tool = finish_tool if finish_tool is not None else SIMPLE_FINISH_TOOL
    self._context_summarization_cutoff = context_summarization_cutoff
    self._run_sync_in_thread = run_sync_in_thread
    self._text_only_tool_responses = text_only_tool_responses

    # Logger (can be passed in or created here)
    self._logger: AgentLoggerBase = logger if logger is not None else AgentLogger()

    # Session configuration (set during session(), used in __aenter__)
    self._pending_output_dir: Path | None = None
    self._pending_input_files: str | Path | list[str | Path] | None = None

    # Instance-scoped state (populated during __aenter__, isolated per agent instance)
    self._active_tools: dict[str, Tool] = {}
    self._last_finish_params: Any = None  # FinishParams type parameter
    self._last_run_metadata: dict[str, list[Any]] = {}
    self._transferred_paths: list[str] = []  # Paths transferred to parent (for subagents)

name property

name: str

The name of this agent.

client property

client: LLMClient

The LLM client used by this agent.

tools property

tools: dict[str, Tool]

Currently active tools (available after entering session context).

finish_tool property

finish_tool: Tool

The finish tool used to signal task completion.

logger property

The logger instance used by this agent.

session

session(
    output_dir: Path | str | None = None,
    input_files: str
    | Path
    | list[str | Path]
    | None = None,
) -> Self

Configure a session and return self for use as async context manager.

Parameters:

Name Type Description Default
output_dir Path | str | None

Directory to save output files from finish_params.paths

None
input_files str | Path | list[str | Path] | None

Files to upload to the execution environment at session start. Accepts a single path or list of paths. Supports: - File paths (str or Path) - Directory paths (uploaded recursively) - Glob patterns (e.g., "data/.csv", "/.py") Raises ValueError if no CodeExecToolProvider is configured or if a glob pattern matches no files.

None

Returns:

Type Description
Self

Self, for use with async with agent.session(...) as session:

Example

async with agent.session(output_dir="./output", input_files="data/*.csv") as session: result = await session.run("Analyze the CSV files")

Note

Multiple concurrent sessions from the same Agent instance are supported. Each session maintains isolated state via ContextVar.

Source code in src/stirrup/core/agent.py
def session(
    self,
    output_dir: Path | str | None = None,
    input_files: str | Path | list[str | Path] | None = None,
) -> Self:
    """Configure a session and return self for use as async context manager.

    Args:
        output_dir: Directory to save output files from finish_params.paths
        input_files: Files to upload to the execution environment at session start.
                    Accepts a single path or list of paths. Supports:
                    - File paths (str or Path)
                    - Directory paths (uploaded recursively)
                    - Glob patterns (e.g., "data/*.csv", "**/*.py")
                    Raises ValueError if no CodeExecToolProvider is configured
                    or if a glob pattern matches no files.

    Returns:
        Self, for use with `async with agent.session(...) as session:`

    Example:
        async with agent.session(output_dir="./output", input_files="data/*.csv") as session:
            result = await session.run("Analyze the CSV files")

    Note:
        Multiple concurrent sessions from the same Agent instance are supported.
        Each session maintains isolated state via ContextVar.

    """
    self._pending_output_dir = Path(output_dir) if output_dir else None
    self._pending_input_files = input_files
    return self

__aenter__ async

__aenter__() -> Self

Enter session context: set up tools, logging, and resources.

Creates a new SessionState and stores it in the _SESSION_STATE ContextVar, allowing concurrent sessions from the same Agent instance.

Source code in src/stirrup/core/agent.py
async def __aenter__(self) -> Self:
    """Enter session context: set up tools, logging, and resources.

    Creates a new SessionState and stores it in the _SESSION_STATE ContextVar,
    allowing concurrent sessions from the same Agent instance.
    """
    exit_stack = AsyncExitStack()
    await exit_stack.__aenter__()

    # Get parent state if exists (for subagent file transfer)
    parent_state = _SESSION_STATE.get(None)

    current_depth = _PARENT_DEPTH.get()

    # Create session state and store in ContextVar
    state = SessionState(
        exit_stack=exit_stack,
        output_dir=str(self._pending_output_dir) if self._pending_output_dir else None,
        parent_exec_env=parent_state.exec_env if parent_state else None,
        depth=current_depth,
    )
    _SESSION_STATE.set(state)

    try:
        # === TWO-PASS TOOL INITIALIZATION ===
        # First pass initializes CodeExecToolProvider so that dependent tools
        # (like ViewImageToolProvider) can access state.exec_env in second pass.
        active_tools: list[Tool] = []

        # First pass: Initialize CodeExecToolProvider (at most one allowed)
        code_exec_providers = [t for t in self._tools if isinstance(t, CodeExecToolProvider)]
        if len(code_exec_providers) > 1:
            raise ValueError(
                f"Agent can only have one CodeExecToolProvider, found {len(code_exec_providers)}: "
                f"{[type(p).__name__ for p in code_exec_providers]}"
            )

        if code_exec_providers:
            provider = code_exec_providers[0]
            result = await exit_stack.enter_async_context(provider)
            if isinstance(result, list):
                active_tools.extend(result)
            else:
                active_tools.append(result)
            state.exec_env = provider

        # Second pass: Initialize remaining ToolProviders and static Tools
        for tool in self._tools:
            if isinstance(tool, CodeExecToolProvider):
                continue  # Already processed in first pass

            if isinstance(tool, ToolProvider):
                # ToolProvider: enter context and get returned tool(s)
                result = await exit_stack.enter_async_context(tool)
                # Handle both single Tool and list[Tool] returns (e.g., MCPToolProvider)
                if isinstance(result, list):
                    active_tools.extend(result)
                else:
                    active_tools.append(result)
            else:
                # Static Tool, use directly
                active_tools.append(tool)

        # Build active tools dict with finish tool (stored on instance, not session)
        self._active_tools = {FINISH_TOOL_NAME: self._finish_tool}
        self._active_tools.update({t.name: t for t in active_tools})

        # Validate subagent code exec requirements (only at root level)
        if current_depth == 0:
            self._validate_subagent_code_exec_requirements()

        # Upload input files to exec_env if specified
        if self._pending_input_files:
            if not state.exec_env:
                raise ValueError("input_files specified but no CodeExecToolProvider configured")

            logger.debug(
                "[%s __aenter__] Uploading input files: %s, depth=%d, parent_exec_env=%s, parent_exec_env._temp_dir=%s",
                self._name,
                self._pending_input_files,
                state.depth,
                type(state.parent_exec_env).__name__ if state.parent_exec_env else None,
                getattr(state.parent_exec_env, "_temp_dir", "N/A") if state.parent_exec_env else None,
            )

            if state.depth > 0 and state.parent_exec_env:
                # SUBAGENT: Read files from parent's exec env, write to subagent's exec env
                # input_files are paths within the parent's environment
                result = await state.exec_env.upload_files(
                    *self._pending_input_files,
                    source_env=state.parent_exec_env,
                )
            else:
                # ROOT AGENT: Read files from local filesystem
                resolved = self._resolve_input_files(self._pending_input_files)
                result = await state.exec_env.upload_files(*resolved)

            logger.debug(
                "[%s __aenter__] Upload result: uploaded=%s, failed=%s", self._name, result.uploaded, result.failed
            )

            # Store uploaded paths for system prompt
            state.uploaded_file_paths = [uf.dest_path for uf in result.uploaded]

            if result.failed:
                raise RuntimeError(f"Failed to upload files: {result.failed}")
        self._pending_input_files = None  # Clear pending state

        # Configure and enter logger context
        self._logger.name = self._name
        self._logger.model = self._client.model_slug
        self._logger.max_turns = self._max_turns
        # depth is already set (0 for main agent, passed in for sub-agents)
        self._logger.__enter__()

        return self

    except Exception:
        await exit_stack.__aexit__(None, None, None)
        raise

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit session context: save files, cleanup resources.

File handling is depth-aware: - Root agent (depth=0): Saves files to local filesystem output_dir - Subagent (depth>0): Transfers files to parent's exec env at output_dir path

Source code in src/stirrup/core/agent.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit session context: save files, cleanup resources.

    File handling is depth-aware:
    - Root agent (depth=0): Saves files to local filesystem output_dir
    - Subagent (depth>0): Transfers files to parent's exec env at output_dir path
    """
    state = _SESSION_STATE.get()

    try:
        # Save files from finish_params.paths based on depth
        if state.output_dir and self._last_finish_params and state.exec_env:
            paths = getattr(self._last_finish_params, "paths", None)
            if paths:
                if state.depth == 0:
                    # ROOT AGENT: Save to local filesystem
                    output_path = Path(state.output_dir)
                    output_path.mkdir(parents=True, exist_ok=True)
                    logger.debug(
                        "[%s] ROOT AGENT (depth=0): Saving %d file(s) to local filesystem: %s -> %s",
                        self._name,
                        len(paths),
                        paths,
                        output_path,
                    )
                    result = await state.exec_env.save_output_files(paths, output_path, dest_env=None)
                    logger.debug(
                        "[%s] ROOT AGENT: Saved %d file(s), failed %d",
                        self._name,
                        len(result.saved),
                        len(result.failed),
                    )
                else:
                    # SUBAGENT: Transfer to parent's exec env
                    if state.parent_exec_env:
                        logger.debug(
                            "[%s] SUBAGENT (depth=%d): Transferring %d file(s) to parent exec env: %s -> %s",
                            self._name,
                            state.depth,
                            len(paths),
                            paths,
                            state.output_dir,
                        )
                        result = await state.exec_env.save_output_files(
                            paths, state.output_dir, dest_env=state.parent_exec_env
                        )
                        # Store transferred paths for returning to parent
                        self._transferred_paths = [str(sf.output_path) for sf in result.saved]
                        logger.debug(
                            "[%s] SUBAGENT: Transferred %d file(s) to parent, failed %d. Paths: %s",
                            self._name,
                            len(result.saved),
                            len(result.failed),
                            self._transferred_paths,
                        )
                        if result.failed:
                            logger.warning("Failed to transfer some files to parent env: %s", result.failed)
                    else:
                        logger.warning(
                            "Subagent at depth %d has exec_env but no parent_exec_env. "
                            "Files will not be transferred.",
                            state.depth,
                        )
    finally:
        # Exit logger context
        self._logger.finish_params = self._last_finish_params
        self._logger.run_metadata = self._last_run_metadata
        self._logger.output_dir = str(state.output_dir) if state.output_dir else None
        self._logger.__exit__(exc_type, exc_val, exc_tb)

        # Cleanup all async resources
        await state.exit_stack.__aexit__(exc_type, exc_val, exc_tb)

run_tool async

run_tool(
    tool_call: ToolCall, run_metadata: dict[str, list[Any]]
) -> ToolMessage

Execute a single tool call with error handling for invalid JSON/arguments.

Returns a ToolMessage containing either the tool output or an error description. Metadata from the tool result is stored in the provided run_metadata dict.

Source code in src/stirrup/core/agent.py
async def run_tool(self, tool_call: ToolCall, run_metadata: dict[str, list[Any]]) -> ToolMessage:
    """Execute a single tool call with error handling for invalid JSON/arguments.

    Returns a ToolMessage containing either the tool output or an error description.
    Metadata from the tool result is stored in the provided run_metadata dict.
    """
    tool = self._active_tools.get(tool_call.name)
    result: ToolResult
    args_valid = True

    # Ensure tool is tracked in metadata dict (even if no metadata returned)
    if tool_call.name not in run_metadata:
        run_metadata[tool_call.name] = []

    if tool:
        try:
            # Parse parameters if the tool has them, otherwise use None
            params = (
                tool.parameters.model_validate_json(tool_call.arguments) if tool.parameters is not None else None
            )

            # Set parent depth for sub-agent tools to read
            prev_depth = _PARENT_DEPTH.set(self._logger.depth)
            try:
                if inspect.iscoroutinefunction(tool.executor):
                    result = await tool.executor(params)  # ty: ignore[invalid-await]
                elif self._run_sync_in_thread:
                    # ty: ignore - type checker doesn't understand iscoroutinefunction narrowing
                    result = await anyio.to_thread.run_sync(tool.executor, params)  # ty: ignore[unresolved-attribute]
                else:
                    # ty: ignore - iscoroutinefunction check above ensures this is sync
                    result = tool.executor(params)  # ty: ignore[invalid-assignment]
            finally:
                _PARENT_DEPTH.reset(prev_depth)

            # Store metadata if present
            if result.metadata is not None:
                run_metadata[tool_call.name].append(result.metadata)
        except ValidationError:
            LOGGER.debug(
                "LLMClient tried to use the tool %s but the tool arguments are not valid: %r",
                tool_call.name,
                tool_call.arguments,
            )
            result = ToolResult(content="Tool arguments are not valid")
            args_valid = False
    else:
        LOGGER.debug(f"LLMClient tried to use the tool {tool_call.name} which is not in the tools list")
        result = ToolResult(content=f"{tool_call.name} is not a valid tool")

    return ToolMessage(
        content=result.content,
        tool_call_id=tool_call.tool_call_id,
        name=tool_call.name,
        args_was_valid=args_valid,
    )

step async

step(
    messages: list[ChatMessage],
    run_metadata: dict[str, list[Any]],
    turn: int = 0,
    max_turns: int = 0,
) -> tuple[
    AssistantMessage, list[ToolMessage], ToolCall | None
]

Execute one agent step: generate assistant message and run any requested tool calls.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Current conversation messages

required
run_metadata dict[str, list[Any]]

Metadata storage for tool results

required
turn int

Current turn number (1-indexed) for logging

0
max_turns int

Maximum turns for logging

0

Returns the assistant message, tool execution results, and finish tool call (if present).

Source code in src/stirrup/core/agent.py
async def step(
    self,
    messages: list[ChatMessage],
    run_metadata: dict[str, list[Any]],
    turn: int = 0,
    max_turns: int = 0,
) -> tuple[AssistantMessage, list[ToolMessage], ToolCall | None]:
    """Execute one agent step: generate assistant message and run any requested tool calls.

    Args:
        messages: Current conversation messages
        run_metadata: Metadata storage for tool results
        turn: Current turn number (1-indexed) for logging
        max_turns: Maximum turns for logging

    Returns the assistant message, tool execution results, and finish tool call (if present).

    """
    assistant_message = await self._client.generate(messages, self._active_tools)

    # Log assistant message immediately
    if turn > 0:
        self._logger.assistant_message(turn, max_turns, assistant_message)

    tool_messages: list[ToolMessage] = []
    finish_call: ToolCall | None = None

    if assistant_message.tool_calls:
        finish_call = next(
            (tc for tc in assistant_message.tool_calls if tc.name == FINISH_TOOL_NAME),
            None,
        )

        tool_messages = []
        for tool_call in assistant_message.tool_calls:
            tool_message = await self.run_tool(tool_call, run_metadata)
            tool_messages.append(tool_message)

            # Log tool result immediately
            self._logger.tool_result(tool_message)

    return assistant_message, tool_messages, finish_call

summarize_messages async

summarize_messages(
    messages: list[ChatMessage],
) -> list[ChatMessage]

Condense message history using LLM to stay within context window.

Source code in src/stirrup/core/agent.py
async def summarize_messages(self, messages: list[ChatMessage]) -> list[ChatMessage]:
    """Condense message history using LLM to stay within context window."""
    task_context: list[ChatMessage] = list(takewhile(lambda m: not isinstance(m, AssistantMessage), messages))

    summary_prompt = [*messages, UserMessage(content=MESSAGE_SUMMARIZER)]

    # We need to pass the tools to the client so that it has context of tools used in the conversation
    summary = await self._client.generate(summary_prompt, self._active_tools)

    summary_bridge_prompt = MESSAGE_SUMMARIZER_BRIDGE_TEMPLATE.format(summary=summary.content)
    summary_bridge = UserMessage(content=summary_bridge_prompt)
    acknowledgement_msg = UserMessage(content="Got it, thanks!")

    # Log the completed summary
    summary_content = summary.content if isinstance(summary.content, str) else str(summary.content)
    self._logger.context_summarization_complete(summary_content, summary_bridge_prompt)

    return [*task_context, summary_bridge, acknowledgement_msg]

run async

run(
    init_msgs: str | list[ChatMessage],
    *,
    depth: int | None = None,
) -> tuple[
    FinishParams | None,
    list[list[ChatMessage]],
    dict[str, list[Any]],
]

Execute the agent loop until finish tool is called or max_turns reached.

A base system prompt is automatically prepended to all runs, including: - Agent purpose and max_turns info - List of input files (if provided via session()) - User's custom system_prompt (if configured in init)

Parameters:

Name Type Description Default
init_msgs str | list[ChatMessage]

Either a string prompt (converted to UserMessage) or a list of ChatMessage to extend the conversation after the system prompt.

required
depth int | None

Logging depth for sub-agent runs. If provided, updates logger.depth for this run.

None

Returns:

Type Description
FinishParams | None

Tuple of (finish params, message history, run metadata).

list[list[ChatMessage]]

finish params is None if max_turns reached.

dict[str, list[Any]]

run metadata maps tool/agent names to lists of metadata returned by each call.

Example
Simple string prompt

await agent.run("Analyze this data and create a report")

Multiple messages

await agent.run([ UserMessage(content="First, read the data"), AssistantMessage(content="I've read the data file..."), UserMessage(content="Now analyze it"), ])

Source code in src/stirrup/core/agent.py
async def run(
    self,
    init_msgs: str | list[ChatMessage],
    *,
    depth: int | None = None,
) -> tuple[FinishParams | None, list[list[ChatMessage]], dict[str, list[Any]]]:
    """Execute the agent loop until finish tool is called or max_turns reached.

    A base system prompt is automatically prepended to all runs, including:
    - Agent purpose and max_turns info
    - List of input files (if provided via session())
    - User's custom system_prompt (if configured in __init__)

    Args:
        init_msgs: Either a string prompt (converted to UserMessage) or a list of
                  ChatMessage to extend the conversation after the system prompt.
        depth: Logging depth for sub-agent runs. If provided, updates logger.depth for this run.

    Returns:
        Tuple of (finish params, message history, run metadata).
        finish params is None if max_turns reached.
        run metadata maps tool/agent names to lists of metadata returned by each call.

    Example:
        # Simple string prompt
        await agent.run("Analyze this data and create a report")

        # Multiple messages
        await agent.run([
            UserMessage(content="First, read the data"),
            AssistantMessage(content="I've read the data file..."),
            UserMessage(content="Now analyze it"),
        ])

    """
    msgs: list[ChatMessage] = []

    # Build the complete system prompt (base + input files + user instructions)
    full_system_prompt = self._build_system_prompt()
    msgs.append(SystemMessage(content=full_system_prompt))

    if isinstance(init_msgs, str):
        msgs.append(UserMessage(content=init_msgs))
    else:
        msgs.extend(init_msgs)

    # Set logger depth if provided (for sub-agent runs)
    if depth is not None:
        self._logger.depth = depth

    # Log the task at run start
    self._logger.task_message(msgs[-1].content)

    # Show warnings (top-level only, if logger supports it)
    if self._logger.depth == 0 and isinstance(self._logger, AgentLogger):
        run_warnings = self._collect_warnings()
        if run_warnings:
            self._logger.warnings_message(run_warnings)

    # Use logger callback if available and not overridden
    step_callback = self._logger.on_step

    # Local metadata storage - isolated per run() invocation for thread safety
    run_metadata: dict[str, list[Any]] = {}

    full_msg_history: list[list[ChatMessage]] = []
    finish_params: FinishParams | None = None

    # Cumulative stats for spinner
    total_tool_calls = 0
    total_input_tokens = 0
    total_output_tokens = 0

    for i in range(self._max_turns):
        if self._max_turns - i <= 30 and i != 0:
            num_turns_remaining_msg = _num_turns_remaining_msg(self._max_turns - i)
            msgs.append(num_turns_remaining_msg)
            self._logger.user_message(num_turns_remaining_msg)

        # Pass turn info to step() for real-time logging
        assistant_message, tool_messages, finish_call = await self.step(
            msgs,
            run_metadata,
            turn=i + 1,
            max_turns=self._max_turns,
        )

        # Update cumulative stats
        total_tool_calls += len(tool_messages)
        total_input_tokens += assistant_message.token_usage.input
        total_output_tokens += assistant_message.token_usage.output

        # Call progress callback after step completes
        if step_callback:
            step_callback(i + 1, total_tool_calls, total_input_tokens, total_output_tokens)

        user_messages: list[UserMessage] = []
        if self._text_only_tool_responses:
            tool_messages, user_messages = _handle_text_only_tool_responses(tool_messages)

        # Log user messages (e.g., image content extracted from tool responses)
        for user_msg in user_messages:
            self._logger.user_message(user_msg)

        msgs.extend([assistant_message, *tool_messages, *user_messages])

        if finish_call:
            try:
                finish_arguments = json.loads(finish_call.arguments)
                if self._finish_tool.parameters is not None:
                    finish_params = self._finish_tool.parameters.model_validate(finish_arguments)
                break
            except (json.JSONDecodeError, ValidationError, TypeError):
                LOGGER.debug(
                    "Agent tried to use the finish tool but the tool call is not valid: %r",
                    finish_call.arguments,
                )
                # continue until the finish tool call is valid

        pct_context_used = assistant_message.token_usage.total / self._client.max_tokens
        if pct_context_used >= self._context_summarization_cutoff and i + 1 != self._max_turns:
            self._logger.context_summarization_start(pct_context_used, self._context_summarization_cutoff)
            full_msg_history.append(msgs)
            msgs = await self.summarize_messages(msgs)
    else:
        LOGGER.error(
            f"Maximum number of turns reached: {self._max_turns}. The agent was not able to finish the task. Consider increasing the max_turns parameter.",
        )

    full_msg_history.append(msgs)

    # Add agent's own token usage to run_metadata under "token_usage" key
    agent_token_usage = _get_total_token_usage(full_msg_history)
    if "token_usage" not in run_metadata:
        run_metadata["token_usage"] = []
    run_metadata["token_usage"].append(agent_token_usage)

    # Store for __aexit__ to access (on instance for this agent)
    self._last_finish_params = finish_params
    self._last_run_metadata = run_metadata

    return finish_params, full_msg_history, run_metadata

to_tool

to_tool(
    *,
    description: str = DEFAULT_SUB_AGENT_DESCRIPTION,
    system_prompt: str | None = None,
) -> Tool[SubAgentParams, SubAgentMetadata]

Convert this Agent to a Tool for use as a sub-agent.

Parameters:

Name Type Description Default
description str

Tool description shown to the parent agent

DEFAULT_SUB_AGENT_DESCRIPTION
system_prompt str | None

Optional system prompt to prepend when running

None

Returns:

Type Description
Tool[SubAgentParams, SubAgentMetadata]

Tool that executes this agent when called, returning SubAgentMetadata

Tool[SubAgentParams, SubAgentMetadata]

containing token usage, message history, and any metadata from tools

Tool[SubAgentParams, SubAgentMetadata]

the sub-agent used.

Source code in src/stirrup/core/agent.py
def to_tool(
    self,
    *,
    description: str = DEFAULT_SUB_AGENT_DESCRIPTION,
    system_prompt: str | None = None,
) -> Tool[SubAgentParams, SubAgentMetadata]:
    """Convert this Agent to a Tool for use as a sub-agent.

    Args:
        description: Tool description shown to the parent agent
        system_prompt: Optional system prompt to prepend when running

    Returns:
        Tool that executes this agent when called, returning SubAgentMetadata
        containing token usage, message history, and any metadata from tools
        the sub-agent used.

    """
    agent = self  # Capture self for closure

    async def sub_agent_executor(params: SubAgentParams) -> ToolResult[SubAgentMetadata]:
        """Execute the sub-agent with the given task.

        Sub-agents enter their own full session to ensure:
        1. Tool isolation - each agent only sees its own tools (fixes recursive sub-agent bug)
        2. Proper ToolProvider lifecycle - sub-agent's ToolProviders are initialized
        3. Correct logging - logger context is entered for proper output formatting
        """
        # Get parent's depth and calculate subagent depth
        parent_depth = _PARENT_DEPTH.get()
        sub_agent_depth = parent_depth + 1

        # Save parent's session state so we can restore it after subagent completes
        # This ensures sibling subagents see the parent's state, not a previous sibling's stale state
        parent_session_state = _SESSION_STATE.get(None)
        logger.debug(
            "[%s] PRE-SESSION: _SESSION_STATE=%s, exec_env=%s, exec_env._temp_dir=%s",
            agent.name,
            id(parent_session_state) if parent_session_state else None,
            type(parent_session_state.exec_env).__name__
            if parent_session_state and parent_session_state.exec_env
            else None,
            getattr(parent_session_state.exec_env, "_temp_dir", "N/A")
            if parent_session_state and parent_session_state.exec_env
            else None,
        )

        # Set _PARENT_DEPTH to subagent's depth BEFORE entering session
        # so that __aenter__ reads the correct depth for SessionState.depth
        prev_depth = _PARENT_DEPTH.set(sub_agent_depth)
        try:
            init_msgs: list[ChatMessage] = []
            if system_prompt:
                init_msgs.append(SystemMessage(content=system_prompt))
            init_msgs.append(UserMessage(content=params.task))

            # Sub-agent enters its own full session for tool isolation and proper lifecycle
            # output_dir is a path within the parent's exec env (not local filesystem)
            # Files are transferred to parent's env at __aexit__ via save_output_files(dest_env=parent)
            async with agent.session(
                output_dir=".",  # Path in parent's exec env
                input_files=list(params.input_files) if params.input_files else None,  # ty: ignore[invalid-argument-type]
            ) as agent_session:
                # Override logger depth for proper indentation in console output
                agent_session._logger.depth = sub_agent_depth  # noqa: SLF001

                finish_params, msg_history, run_metadata = await agent_session.run(init_msgs)

                # Extract the last assistant message with actual content (not just tool calls)
                last_assistant_msg: AssistantMessage | None = None
                for msg_group in reversed(msg_history):
                    for msg in reversed(msg_group):
                        if isinstance(msg, AssistantMessage) and msg.content:
                            last_assistant_msg = msg
                            break
                    if last_assistant_msg:
                        break

                # Build content from the assistant message and/or finish params
                content_parts: list[str] = []

                if last_assistant_msg and last_assistant_msg.content:
                    content = last_assistant_msg.content
                    if isinstance(content, list):
                        content = "\n".join(str(block) for block in content)
                    content_parts.append(content)

                # Include finish params if available (they often contain the actual result)
                if finish_params is not None:
                    finish_dict = finish_params.model_dump()
                    if finish_dict:
                        content_parts.append(f"Finish params: {finish_dict}")

                # Report files transferred to parent's exec env (set in __aexit__)
                transferred_paths = agent_session._transferred_paths  # noqa: SLF001
                if transferred_paths:
                    content_parts.append(f"Files available in your environment: {transferred_paths}")

                if not content_parts:
                    result_content = "<sub_agent_result>\n<error>No assistant message or finish params found</error>\n</sub_agent_result>"
                else:
                    content = "\n".join(content_parts)
                    result_content = (
                        f"<sub_agent_result>"
                        f"\n<response>{content}</response>"
                        f"\n<finished>{finish_params is not None}</finished>"
                        f"\n</sub_agent_result>"
                    )

                # Create subagent metadata with token usage, message history, and run metadata
                sub_metadata = SubAgentMetadata(
                    message_history=msg_history,
                    run_metadata=run_metadata,
                )

                return ToolResult(content=result_content, metadata=sub_metadata)

        except Exception as e:
            # On error, return empty metadata
            error_metadata = SubAgentMetadata(
                message_history=[],
                run_metadata={},
            )
            return ToolResult(
                content=f"<sub_agent_result>\n<error>{e!s}</error>\n</sub_agent_result>",
                metadata=error_metadata,
            )
        finally:
            # DEBUG: Log SESSION_STATE after subagent session
            post_session_state = _SESSION_STATE.get(None)
            logger.debug(
                "[%s] POST-SESSION: _SESSION_STATE=%s, exec_env=%s, exec_env._temp_dir=%s",
                agent.name,
                id(post_session_state) if post_session_state else None,
                type(post_session_state.exec_env).__name__
                if post_session_state and post_session_state.exec_env
                else None,
                getattr(post_session_state.exec_env, "_temp_dir", "N/A")
                if post_session_state and post_session_state.exec_env
                else None,
            )

            # Restore parent's depth
            _PARENT_DEPTH.reset(prev_depth)
            # Restore parent's session state so next sibling subagent sees it
            if parent_session_state is not None:
                _SESSION_STATE.set(parent_session_state)

    return Tool[SubAgentParams, SubAgentMetadata](
        name=self._name,
        description=description,
        parameters=SubAgentParams,
        executor=sub_agent_executor,  # ty: ignore[invalid-argument-type]
    )

_num_turns_remaining_msg

_num_turns_remaining_msg(
    number_of_turns_remaining: int,
) -> UserMessage

Create a user message warning the agent about remaining turns before max_turns is reached.

Source code in src/stirrup/core/agent.py
def _num_turns_remaining_msg(number_of_turns_remaining: int) -> UserMessage:
    """Create a user message warning the agent about remaining turns before max_turns is reached."""
    if number_of_turns_remaining == 1:
        return UserMessage(content="This is the last turn. Please finish the task by calling the finish tool.")
    return UserMessage(
        content=f"You have {number_of_turns_remaining} turns remaining to complete the task. Please continue. Remember you will need a separate turn to finish the task.",
    )

_handle_text_only_tool_responses

_handle_text_only_tool_responses(
    tool_messages: list[ToolMessage],
) -> tuple[list[ToolMessage], list[UserMessage]]

Extract image blocks from tool messages and convert them to user messages for text-only models.

Source code in src/stirrup/core/agent.py
def _handle_text_only_tool_responses(tool_messages: list[ToolMessage]) -> tuple[list[ToolMessage], list[UserMessage]]:
    """Extract image blocks from tool messages and convert them to user messages for text-only models."""
    user_messages: list[UserMessage] = []
    for tm in tool_messages:
        if isinstance(tm.content, list):
            for idx, block in enumerate(tm.content):
                if isinstance(block, ImageContentBlock):
                    user_messages.append(
                        UserMessage(content=[f"Here is the image for tool call {tm.tool_call_id}", block]),
                    )
                    tm.content[idx] = f"Done! The User will provide the image for tool call {tm.tool_call_id}"
                elif isinstance(block, str):
                    continue
                else:
                    raise NotImplementedError(f"Unsupported content block: {type(block)}")

    return tool_messages, user_messages

_get_total_token_usage

_get_total_token_usage(
    messages: list[list[ChatMessage]],
) -> TokenUsage

Aggregate token usage across all assistant messages in grouped conversation history.

Parameters:

Name Type Description Default
messages list[list[ChatMessage]]

List of message groups, where each group represents a segment of conversation.

required
Source code in src/stirrup/core/agent.py
def _get_total_token_usage(messages: list[list[ChatMessage]]) -> TokenUsage:
    """Aggregate token usage across all assistant messages in grouped conversation history.

    Args:
        messages: List of message groups, where each group represents a segment of conversation.

    """
    return sum(
        [msg.token_usage for msg in chain.from_iterable(messages) if isinstance(msg, AssistantMessage)],
        start=TokenUsage(),
    )