Skip to content

API Reference

This section documents the main classes and helpers exposed by Pygent. The content is generated from the package docstrings.

Agent

Interactive assistant handling messages and tool execution.

Source code in pygent/agent.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass
class Agent:
    """Interactive assistant handling messages and tool execution."""
    runtime: Runtime = field(default_factory=Runtime)
    model: Model = field(default_factory=_default_model)
    model_name: str = DEFAULT_MODEL
    persona: Persona = field(default_factory=lambda: DEFAULT_PERSONA)
    system_msg: str = field(default_factory=lambda: build_system_msg(DEFAULT_PERSONA))
    history: List[Dict[str, Any]] = field(default_factory=list)
    history_file: Optional[pathlib.Path] = field(default_factory=_default_history_file)

    def __post_init__(self) -> None:
        """Initialize defaults after dataclass construction."""
        if not self.system_msg:
            self.system_msg = build_system_msg(self.persona)
        if self.history_file and isinstance(self.history_file, (str, pathlib.Path)):
            self.history_file = pathlib.Path(self.history_file)
            if self.history_file.is_file():
                try:
                    with self.history_file.open("r", encoding="utf-8") as fh:
                        data = json.load(fh)
                except Exception:
                    data = []
                self.history = [
                    openai_compat.parse_message(m) if isinstance(m, dict) else m
                    for m in data
                ]
        if not self.history:
            self.append_history({"role": "system", "content": self.system_msg})

    def _message_dict(self, msg: Any) -> Dict[str, Any]:
        if isinstance(msg, dict):
            return msg
        if isinstance(msg, openai_compat.Message):
            data = {"role": msg.role, "content": msg.content}
            if msg.tool_calls:
                data["tool_calls"] = [asdict(tc) for tc in msg.tool_calls]
            return data
        raise TypeError(f"Unsupported message type: {type(msg)!r}")

    def _save_history(self) -> None:
        if self.history_file:
            self.history_file.parent.mkdir(parents=True, exist_ok=True)
            with self.history_file.open("w", encoding="utf-8") as fh:
                json.dump([self._message_dict(m) for m in self.history], fh)

    def append_history(self, msg: Any) -> None:
        self.history.append(msg)
        self._save_history()

    def refresh_system_message(self) -> None:
        """Update the system prompt based on the current tool registry."""
        self.system_msg = build_system_msg(self.persona)
        if self.history and self.history[0].get("role") == "system":
            self.history[0]["content"] = self.system_msg

    def step(self, user_msg: str):
        """Execute one round of interaction with the model."""

        self.refresh_system_message()
        self.append_history({"role": "user", "content": user_msg})

        assistant_raw = self.model.chat(
            self.history, self.model_name, tools.TOOL_SCHEMAS
        )
        assistant_msg = openai_compat.parse_message(assistant_raw)
        self.append_history(assistant_msg)

        if assistant_msg.tool_calls:
            for call in assistant_msg.tool_calls:
                output = tools.execute_tool(call, self.runtime)
                self.append_history(
                    {"role": "tool", "content": output, "tool_call_id": call.id}
                )
                console.print(
                    Panel(
                        output,
                        title=f"{self.persona.name} tool:{call.function.name}",
                    )
                )
        else:
            markdown_response = Markdown(assistant_msg.content)
            console.print(
                Panel(
                    markdown_response,
                    title=f"Resposta de {self.persona.name}",
                    title_align="left",
                    border_style="cyan",
                )
            )
        return assistant_msg

    def run_until_stop(
        self,
        user_msg: str,
        max_steps: int = 20,
        step_timeout: Optional[float] = None,
        max_time: Optional[float] = None,
    ) -> None:
        """Run steps until ``stop`` is called or limits are reached."""

        if step_timeout is None:
            env = os.getenv("PYGENT_STEP_TIMEOUT")
            step_timeout = float(env) if env else None
        if max_time is None:
            env = os.getenv("PYGENT_TASK_TIMEOUT")
            max_time = float(env) if env else None

        msg = user_msg
        start = time.monotonic()
        self._timed_out = False
        for _ in range(max_steps):
            if max_time is not None and time.monotonic() - start > max_time:
                self.append_history(
                    {"role": "system", "content": f"[timeout after {max_time}s]"}
                )
                self._timed_out = True
                break
            step_start = time.monotonic()
            assistant_msg = self.step(msg)
            if (
                step_timeout is not None
                and time.monotonic() - step_start > step_timeout
            ):
                self.append_history(
                    {"role": "system", "content": f"[timeout after {step_timeout}s]"}
                )
                self._timed_out = True
                break
            calls = assistant_msg.tool_calls or []
            if any(c.function.name in ("stop", "continue") for c in calls):
                break
            msg = "continue"

__post_init__()

Initialize defaults after dataclass construction.

Source code in pygent/agent.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __post_init__(self) -> None:
    """Initialize defaults after dataclass construction."""
    if not self.system_msg:
        self.system_msg = build_system_msg(self.persona)
    if self.history_file and isinstance(self.history_file, (str, pathlib.Path)):
        self.history_file = pathlib.Path(self.history_file)
        if self.history_file.is_file():
            try:
                with self.history_file.open("r", encoding="utf-8") as fh:
                    data = json.load(fh)
            except Exception:
                data = []
            self.history = [
                openai_compat.parse_message(m) if isinstance(m, dict) else m
                for m in data
            ]
    if not self.history:
        self.append_history({"role": "system", "content": self.system_msg})

refresh_system_message()

Update the system prompt based on the current tool registry.

Source code in pygent/agent.py
104
105
106
107
108
def refresh_system_message(self) -> None:
    """Update the system prompt based on the current tool registry."""
    self.system_msg = build_system_msg(self.persona)
    if self.history and self.history[0].get("role") == "system":
        self.history[0]["content"] = self.system_msg

run_until_stop(user_msg, max_steps=20, step_timeout=None, max_time=None)

Run steps until stop is called or limits are reached.

Source code in pygent/agent.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def run_until_stop(
    self,
    user_msg: str,
    max_steps: int = 20,
    step_timeout: Optional[float] = None,
    max_time: Optional[float] = None,
) -> None:
    """Run steps until ``stop`` is called or limits are reached."""

    if step_timeout is None:
        env = os.getenv("PYGENT_STEP_TIMEOUT")
        step_timeout = float(env) if env else None
    if max_time is None:
        env = os.getenv("PYGENT_TASK_TIMEOUT")
        max_time = float(env) if env else None

    msg = user_msg
    start = time.monotonic()
    self._timed_out = False
    for _ in range(max_steps):
        if max_time is not None and time.monotonic() - start > max_time:
            self.append_history(
                {"role": "system", "content": f"[timeout after {max_time}s]"}
            )
            self._timed_out = True
            break
        step_start = time.monotonic()
        assistant_msg = self.step(msg)
        if (
            step_timeout is not None
            and time.monotonic() - step_start > step_timeout
        ):
            self.append_history(
                {"role": "system", "content": f"[timeout after {step_timeout}s]"}
            )
            self._timed_out = True
            break
        calls = assistant_msg.tool_calls or []
        if any(c.function.name in ("stop", "continue") for c in calls):
            break
        msg = "continue"

step(user_msg)

Execute one round of interaction with the model.

Source code in pygent/agent.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def step(self, user_msg: str):
    """Execute one round of interaction with the model."""

    self.refresh_system_message()
    self.append_history({"role": "user", "content": user_msg})

    assistant_raw = self.model.chat(
        self.history, self.model_name, tools.TOOL_SCHEMAS
    )
    assistant_msg = openai_compat.parse_message(assistant_raw)
    self.append_history(assistant_msg)

    if assistant_msg.tool_calls:
        for call in assistant_msg.tool_calls:
            output = tools.execute_tool(call, self.runtime)
            self.append_history(
                {"role": "tool", "content": output, "tool_call_id": call.id}
            )
            console.print(
                Panel(
                    output,
                    title=f"{self.persona.name} tool:{call.function.name}",
                )
            )
    else:
        markdown_response = Markdown(assistant_msg.content)
        console.print(
            Panel(
                markdown_response,
                title=f"Resposta de {self.persona.name}",
                title_align="left",
                border_style="cyan",
            )
        )
    return assistant_msg

Runtime

Executes commands in a Docker container or locally if Docker is unavailable.

If workspace or the environment variable PYGENT_WORKSPACE is set, the given directory is used as the base workspace and kept across sessions.

Source code in pygent/runtime.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class Runtime:
    """Executes commands in a Docker container or locally if Docker is unavailable.

    If ``workspace`` or the environment variable ``PYGENT_WORKSPACE`` is set,
    the given directory is used as the base workspace and kept across sessions.
    """

    def __init__(
        self,
        image: Optional[str] = None,
        use_docker: Optional[bool] = None,
        initial_files: Optional[list[str]] = None,
        workspace: Optional[Union[str, Path]] = None,
    ) -> None:
        env_ws = os.getenv("PYGENT_WORKSPACE")
        if workspace is None and env_ws:
            workspace = env_ws
        if workspace is None:
            self.base_dir = Path.cwd() / f"agent_{uuid.uuid4().hex[:8]}"
            self._persistent = False
        else:
            self.base_dir = Path(workspace).expanduser()
            self._persistent = True
        self.base_dir.mkdir(parents=True, exist_ok=True)
        if initial_files is None:
            env_files = os.getenv("PYGENT_INIT_FILES")
            if env_files:
                initial_files = [f.strip() for f in env_files.split(os.pathsep) if f.strip()]
        self._initial_files = initial_files or []
        self.image = image or os.getenv("PYGENT_IMAGE", "python:3.12-slim")
        env_opt = os.getenv("PYGENT_USE_DOCKER")
        if use_docker is None:
            use_docker = (env_opt != "0") if env_opt is not None else True
        self._use_docker = bool(docker) and use_docker
        if self._use_docker:
            try:
                self.client = docker.from_env()
                self.container = self.client.containers.run(
                    self.image,
                    name=f"pygent-{uuid.uuid4().hex[:8]}",
                    command="sleep infinity",
                    volumes={str(self.base_dir): {"bind": "/workspace", "mode": "rw"}},
                    working_dir="/workspace",
                    detach=True,
                    tty=True,
                    network_disabled=True,
                    mem_limit="512m",
                    pids_limit=256,
                )
            except Exception:
                self._use_docker = False
        if not self._use_docker:
            self.client = None
            self.container = None

        # populate workspace with initial files
        for fp in self._initial_files:
            src = Path(fp).expanduser()
            dest = self.base_dir / src.name
            if src.is_dir():
                shutil.copytree(src, dest, dirs_exist_ok=True)
            elif src.exists():
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy(src, dest)

    @property
    def use_docker(self) -> bool:
        """Return ``True`` if commands run inside a Docker container."""
        return self._use_docker

    # ---------------- public API ----------------
    def bash(self, cmd: str, timeout: int = 30) -> str:
        """Run a command in the container or locally and return the output.

        The executed command is always included in the returned string so the
        caller can display what was run.
        """
        if self._use_docker and self.container is not None:
            try:
                res = self.container.exec_run(
                    cmd,
                    workdir="/workspace",
                    demux=True,
                    tty=False,
                    stdin=False,
                    timeout=timeout,
                )
                stdout, stderr = (
                    res.output if isinstance(res.output, tuple) else (res.output, b"")
                )
                output = (stdout or b"").decode() + (stderr or b"").decode()
                return f"$ {cmd}\n{output}"
            except Exception as exc:
                return f"$ {cmd}\n[error] {exc}"
        try:
            proc = subprocess.run(
                cmd,
                shell=True,
                cwd=self.base_dir,
                capture_output=True,
                text=True,
                stdin=subprocess.DEVNULL,
                timeout=timeout,
            )
            return f"$ {cmd}\n{proc.stdout + proc.stderr}"
        except subprocess.TimeoutExpired:
            return f"$ {cmd}\n[timeout after {timeout}s]"
        except Exception as exc:
            return f"$ {cmd}\n[error] {exc}"

    def write_file(self, path: Union[str, Path], content: str) -> str:
        p = self.base_dir / path
        p.parent.mkdir(parents=True, exist_ok=True)
        p.write_text(content, encoding="utf-8")
        return f"Wrote {p.relative_to(self.base_dir)}"

    def read_file(self, path: Union[str, Path], binary: bool = False) -> str:
        """Return the contents of a file relative to the workspace."""

        p = self.base_dir / path
        if not p.exists():
            return f"file {p.relative_to(self.base_dir)} not found"
        data = p.read_bytes()
        if binary:
            import base64

            return base64.b64encode(data).decode()
        try:
            return data.decode()
        except UnicodeDecodeError:
            import base64

            return base64.b64encode(data).decode()

    def upload_file(self, src: Union[str, Path], dest: Optional[Union[str, Path]] = None) -> str:
        """Copy a local file or directory into the workspace."""

        src_path = Path(src).expanduser()
        if not src_path.exists():
            return f"file {src} not found"
        target = self.base_dir / (Path(dest) if dest else src_path.name)
        if src_path.is_dir():
            shutil.copytree(src_path, target, dirs_exist_ok=True)
        else:
            target.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(src_path, target)
        return f"Uploaded {target.relative_to(self.base_dir)}"

    def export_file(self, path: Union[str, Path], dest: Union[str, Path]) -> str:
        """Copy a file or directory from the workspace to a local path."""

        src = self.base_dir / path
        if not src.exists():
            return f"file {path} not found"
        dest_path = Path(dest).expanduser()
        if src.is_dir():
            shutil.copytree(src, dest_path, dirs_exist_ok=True)
        else:
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(src, dest_path)
        return f"Exported {src.relative_to(self.base_dir)}"

    def cleanup(self) -> None:
        if self._use_docker and self.container is not None:
            try:
                self.container.kill()
            finally:
                self.container.remove(force=True)
        if not self._persistent:
            shutil.rmtree(self.base_dir, ignore_errors=True)

use_docker property

Return True if commands run inside a Docker container.

bash(cmd, timeout=30)

Run a command in the container or locally and return the output.

The executed command is always included in the returned string so the caller can display what was run.

Source code in pygent/runtime.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def bash(self, cmd: str, timeout: int = 30) -> str:
    """Run a command in the container or locally and return the output.

    The executed command is always included in the returned string so the
    caller can display what was run.
    """
    if self._use_docker and self.container is not None:
        try:
            res = self.container.exec_run(
                cmd,
                workdir="/workspace",
                demux=True,
                tty=False,
                stdin=False,
                timeout=timeout,
            )
            stdout, stderr = (
                res.output if isinstance(res.output, tuple) else (res.output, b"")
            )
            output = (stdout or b"").decode() + (stderr or b"").decode()
            return f"$ {cmd}\n{output}"
        except Exception as exc:
            return f"$ {cmd}\n[error] {exc}"
    try:
        proc = subprocess.run(
            cmd,
            shell=True,
            cwd=self.base_dir,
            capture_output=True,
            text=True,
            stdin=subprocess.DEVNULL,
            timeout=timeout,
        )
        return f"$ {cmd}\n{proc.stdout + proc.stderr}"
    except subprocess.TimeoutExpired:
        return f"$ {cmd}\n[timeout after {timeout}s]"
    except Exception as exc:
        return f"$ {cmd}\n[error] {exc}"

export_file(path, dest)

Copy a file or directory from the workspace to a local path.

Source code in pygent/runtime.py
166
167
168
169
170
171
172
173
174
175
176
177
178
def export_file(self, path: Union[str, Path], dest: Union[str, Path]) -> str:
    """Copy a file or directory from the workspace to a local path."""

    src = self.base_dir / path
    if not src.exists():
        return f"file {path} not found"
    dest_path = Path(dest).expanduser()
    if src.is_dir():
        shutil.copytree(src, dest_path, dirs_exist_ok=True)
    else:
        dest_path.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy(src, dest_path)
    return f"Exported {src.relative_to(self.base_dir)}"

read_file(path, binary=False)

Return the contents of a file relative to the workspace.

Source code in pygent/runtime.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def read_file(self, path: Union[str, Path], binary: bool = False) -> str:
    """Return the contents of a file relative to the workspace."""

    p = self.base_dir / path
    if not p.exists():
        return f"file {p.relative_to(self.base_dir)} not found"
    data = p.read_bytes()
    if binary:
        import base64

        return base64.b64encode(data).decode()
    try:
        return data.decode()
    except UnicodeDecodeError:
        import base64

        return base64.b64encode(data).decode()

upload_file(src, dest=None)

Copy a local file or directory into the workspace.

Source code in pygent/runtime.py
152
153
154
155
156
157
158
159
160
161
162
163
164
def upload_file(self, src: Union[str, Path], dest: Optional[Union[str, Path]] = None) -> str:
    """Copy a local file or directory into the workspace."""

    src_path = Path(src).expanduser()
    if not src_path.exists():
        return f"file {src} not found"
    target = self.base_dir / (Path(dest) if dest else src_path.name)
    if src_path.is_dir():
        shutil.copytree(src_path, target, dirs_exist_ok=True)
    else:
        target.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy(src_path, target)
    return f"Uploaded {target.relative_to(self.base_dir)}"

TaskManager

Launch agents asynchronously and track their progress.

Source code in pygent/task_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class TaskManager:
    """Launch agents asynchronously and track their progress."""

    def __init__(
        self,
        agent_factory: Optional[Callable[..., "Agent"]] = None,
        max_tasks: Optional[int] = None,
        personas: Optional[list[Persona]] = None,
    ) -> None:
        from .agent import Agent  # local import to avoid circular dependency

        env_max = os.getenv("PYGENT_MAX_TASKS")
        self.max_tasks = max_tasks if max_tasks is not None else int(env_max or "3")
        if agent_factory is None:
            self.agent_factory = lambda p=None: Agent(persona=p)
        else:
            self.agent_factory = agent_factory
        env_personas_json = os.getenv("PYGENT_TASK_PERSONAS_JSON")
        if personas is None and env_personas_json:
            try:
                data = json.loads(env_personas_json)
                if isinstance(data, list):
                    personas = [
                        Persona(p.get("name", ""), p.get("description", ""))
                        for p in data
                        if isinstance(p, dict)
                    ]
            except Exception:
                personas = None
        env_personas = os.getenv("PYGENT_TASK_PERSONAS")
        if personas is None and env_personas:
            personas = [
                Persona(p.strip(), "")
                for p in env_personas.split(os.pathsep)
                if p.strip()
            ]
        if personas is None:
            personas = [
                Persona(
                    os.getenv("PYGENT_PERSONA_NAME", "Pygent"),
                    os.getenv("PYGENT_PERSONA", "a sandboxed coding assistant."),
                )
            ]
        self.personas = personas
        self._persona_idx = 0
        self.tasks: Dict[str, Task] = {}
        self._lock = threading.Lock()

    def start_task(
        self,
        prompt: str,
        parent_rt: Runtime,
        files: Optional[list[str]] = None,
        parent_depth: int = 0,
        step_timeout: Optional[float] = None,
        task_timeout: Optional[float] = None,
        persona: Union[Persona, str, None] = None,
    ) -> str:
        """Create a new agent and run ``prompt`` asynchronously.

        ``persona`` overrides the default rotation used for delegated tasks.
        """

        if parent_depth >= 1:
            raise RuntimeError("nested delegation is not allowed")

        with self._lock:
            active = sum(t.status == "running" for t in self.tasks.values())
            if active >= self.max_tasks:
                raise RuntimeError(f"max {self.max_tasks} tasks reached")

        if step_timeout is None:
            env = os.getenv("PYGENT_STEP_TIMEOUT")
            step_timeout = float(env) if env else 60 * 5  # default 5 minutes
        if task_timeout is None:
            env = os.getenv("PYGENT_TASK_TIMEOUT")
            task_timeout = float(env) if env else 60 * 20  # default 20 minutes

        if persona is None:
            persona = self.personas[self._persona_idx % len(self.personas)]
            self._persona_idx += 1
        elif isinstance(persona, str):
            match = next((p for p in self.personas if p.name == persona), None)
            persona = match or Persona(persona, "")
        try:
            agent = self.agent_factory(persona)
        except TypeError:
            agent = self.agent_factory()

        from .runtime import Runtime
        if getattr(agent, "runtime", None) is not None:
            try:
                agent.runtime.cleanup()
            except Exception:
                pass
        task_dir = parent_rt.base_dir / f"task_{uuid.uuid4().hex[:8]}"
        agent.runtime = Runtime(use_docker=parent_rt.use_docker, workspace=task_dir)
        setattr(agent, "persona", persona)
        if not getattr(agent, "system_msg", None):
            from .agent import build_system_msg  # lazy import

            agent.system_msg = build_system_msg(persona)
        setattr(agent.runtime, "task_depth", parent_depth + 1)
        if files:
            for fp in files:
                src = parent_rt.base_dir / fp
                dest = agent.runtime.base_dir / fp
                if src.is_dir():
                    shutil.copytree(src, dest, dirs_exist_ok=True)
                elif src.exists():
                    dest.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy(src, dest)
        task_id = uuid.uuid4().hex[:8]
        task = Task(id=task_id, agent=agent, thread=None)  # type: ignore[arg-type]

        def run() -> None:
            try:
                agent.run_until_stop(
                    prompt,
                    step_timeout=step_timeout,
                    max_time=task_timeout,
                )
                if getattr(agent, "_timed_out", False):
                    task.status = f"timeout after {task_timeout}s"
                else:
                    task.status = "finished"
            except Exception as exc:  # pragma: no cover - error propagation
                task.status = f"error: {exc}"

        t = threading.Thread(target=run, daemon=True)
        task.thread = t
        with self._lock:
            self.tasks[task_id] = task
        t.start()
        return task_id

    def status(self, task_id: str) -> str:
        with self._lock:
            task = self.tasks.get(task_id)
        if not task:
            return f"Task {task_id} not found"
        return task.status

    def collect_file(
        self, rt: Runtime, task_id: str, path: str, dest: Optional[str] = None
    ) -> str:
        """Copy a file or directory from a task workspace into ``rt``."""

        with self._lock:
            task = self.tasks.get(task_id)
        if not task:
            return f"Task {task_id} not found"
        src = task.agent.runtime.base_dir / path
        if not src.exists():
            return f"file {path} not found"
        dest_path = rt.base_dir / (dest or path)
        if src.is_dir():
            shutil.copytree(src, dest_path, dirs_exist_ok=True)
        else:
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(src, dest_path)
        return f"Retrieved {dest_path.relative_to(rt.base_dir)}"

collect_file(rt, task_id, path, dest=None)

Copy a file or directory from a task workspace into rt.

Source code in pygent/task_manager.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def collect_file(
    self, rt: Runtime, task_id: str, path: str, dest: Optional[str] = None
) -> str:
    """Copy a file or directory from a task workspace into ``rt``."""

    with self._lock:
        task = self.tasks.get(task_id)
    if not task:
        return f"Task {task_id} not found"
    src = task.agent.runtime.base_dir / path
    if not src.exists():
        return f"file {path} not found"
    dest_path = rt.base_dir / (dest or path)
    if src.is_dir():
        shutil.copytree(src, dest_path, dirs_exist_ok=True)
    else:
        dest_path.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy(src, dest_path)
    return f"Retrieved {dest_path.relative_to(rt.base_dir)}"

start_task(prompt, parent_rt, files=None, parent_depth=0, step_timeout=None, task_timeout=None, persona=None)

Create a new agent and run prompt asynchronously.

persona overrides the default rotation used for delegated tasks.

Source code in pygent/task_manager.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def start_task(
    self,
    prompt: str,
    parent_rt: Runtime,
    files: Optional[list[str]] = None,
    parent_depth: int = 0,
    step_timeout: Optional[float] = None,
    task_timeout: Optional[float] = None,
    persona: Union[Persona, str, None] = None,
) -> str:
    """Create a new agent and run ``prompt`` asynchronously.

    ``persona`` overrides the default rotation used for delegated tasks.
    """

    if parent_depth >= 1:
        raise RuntimeError("nested delegation is not allowed")

    with self._lock:
        active = sum(t.status == "running" for t in self.tasks.values())
        if active >= self.max_tasks:
            raise RuntimeError(f"max {self.max_tasks} tasks reached")

    if step_timeout is None:
        env = os.getenv("PYGENT_STEP_TIMEOUT")
        step_timeout = float(env) if env else 60 * 5  # default 5 minutes
    if task_timeout is None:
        env = os.getenv("PYGENT_TASK_TIMEOUT")
        task_timeout = float(env) if env else 60 * 20  # default 20 minutes

    if persona is None:
        persona = self.personas[self._persona_idx % len(self.personas)]
        self._persona_idx += 1
    elif isinstance(persona, str):
        match = next((p for p in self.personas if p.name == persona), None)
        persona = match or Persona(persona, "")
    try:
        agent = self.agent_factory(persona)
    except TypeError:
        agent = self.agent_factory()

    from .runtime import Runtime
    if getattr(agent, "runtime", None) is not None:
        try:
            agent.runtime.cleanup()
        except Exception:
            pass
    task_dir = parent_rt.base_dir / f"task_{uuid.uuid4().hex[:8]}"
    agent.runtime = Runtime(use_docker=parent_rt.use_docker, workspace=task_dir)
    setattr(agent, "persona", persona)
    if not getattr(agent, "system_msg", None):
        from .agent import build_system_msg  # lazy import

        agent.system_msg = build_system_msg(persona)
    setattr(agent.runtime, "task_depth", parent_depth + 1)
    if files:
        for fp in files:
            src = parent_rt.base_dir / fp
            dest = agent.runtime.base_dir / fp
            if src.is_dir():
                shutil.copytree(src, dest, dirs_exist_ok=True)
            elif src.exists():
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy(src, dest)
    task_id = uuid.uuid4().hex[:8]
    task = Task(id=task_id, agent=agent, thread=None)  # type: ignore[arg-type]

    def run() -> None:
        try:
            agent.run_until_stop(
                prompt,
                step_timeout=step_timeout,
                max_time=task_timeout,
            )
            if getattr(agent, "_timed_out", False):
                task.status = f"timeout after {task_timeout}s"
            else:
                task.status = "finished"
        except Exception as exc:  # pragma: no cover - error propagation
            task.status = f"error: {exc}"

    t = threading.Thread(target=run, daemon=True)
    task.thread = t
    with self._lock:
        self.tasks[task_id] = task
    t.start()
    return task_id

Tools

Tool registry and helper utilities.

clear_tools()

Remove all registered tools globally.

Source code in pygent/tools.py
189
190
191
192
def clear_tools() -> None:
    """Remove all registered tools globally."""
    TOOLS.clear()
    TOOL_SCHEMAS.clear()

execute_tool(call, rt)

Dispatch a tool call.

Source code in pygent/tools.py
55
56
57
58
59
60
61
62
def execute_tool(call: Any, rt: Runtime) -> str:  # pragma: no cover
    """Dispatch a tool call."""
    name = call.function.name
    args: Dict[str, Any] = json.loads(call.function.arguments)
    func = TOOLS.get(name)
    if func is None:
        return f"⚠️ unknown tool {name}"
    return func(rt, **args)

register_tool(name, description, parameters, func)

Register a new callable tool.

Source code in pygent/tools.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def register_tool(
    name: str, description: str, parameters: Dict[str, Any], func: Callable[..., str]
) -> None:
    """Register a new callable tool."""
    if name in TOOLS:
        raise ValueError(f"tool {name} already registered")
    TOOLS[name] = func
    TOOL_SCHEMAS.append(
        {
            "type": "function",
            "function": {
                "name": name,
                "description": description,
                "parameters": parameters,
            },
        }
    )

remove_tool(name)

Unregister a specific tool.

Source code in pygent/tools.py
202
203
204
205
206
207
208
209
210
211
def remove_tool(name: str) -> None:
    """Unregister a specific tool."""
    if name not in TOOLS:
        raise ValueError(f"tool {name} not registered")
    del TOOLS[name]
    for i, schema in enumerate(TOOL_SCHEMAS):
        func = schema.get("function", {})
        if func.get("name") == name:
            TOOL_SCHEMAS.pop(i)
            break

reset_tools()

Restore the default built-in tools.

Source code in pygent/tools.py
195
196
197
198
199
def reset_tools() -> None:
    """Restore the default built-in tools."""
    clear_tools()
    TOOLS.update(BUILTIN_TOOLS)
    TOOL_SCHEMAS.extend(deepcopy(BUILTIN_TOOL_SCHEMAS))

tool(name, description, parameters)

Decorator for registering a tool.

Source code in pygent/tools.py
45
46
47
48
49
50
51
52
def tool(name: str, description: str, parameters: Dict[str, Any]):
    """Decorator for registering a tool."""

    def decorator(func: Callable[..., str]) -> Callable[..., str]:
        register_tool(name, description, parameters, func)
        return func

    return decorator