All Examples
SHIPIT Agent's Cognitive Architectures provide autonomous, self-directing agent capabilities out of the box. These are pre-built, high-level agent loops designed for complex tasks that require planning, reflection, or dynamic tool creation.
All deep agents support MCP servers, built-in tools (web search, code execution, file operations), real-time streaming, and the full agent configuration. Use .with_builtins() on any deep agent for instant access to all capabilities.
Runnable examples
python examples/11_deep_agents.py— all deep agents in one scriptpython examples/08_structured_output.py— Pydantic & JSON schema outputpython examples/09_pipeline.py— sequential, parallel, conditionalpython examples/10_agent_team.py— multi-agent team with streamingpython examples/12_advanced_memory.py— all memory types- Notebooks:
notebooks/14_deep_agents.ipynb,notebooks/16_deep_agents_streaming.ipynb,notebooks/17_deep_agents_real_world.ipynb,notebooks/18_deep_agents_with_memory.ipynb
Goal Agent — Autonomous Goal Decomposition
GoalAgent decomposes goals into sub-tasks, executes them using tools, and tracks progress against explicit success criteria. It self-evaluates after each step.
graph TD
A[Goal] --> B[Plan Sub-tasks]
B --> C[Execute Task]
C --> D[Evaluate Result]
D --> E{Goal Met?}
E -- No --> B
E -- Yes --> F[Success]Basic usage
from shipit_agent.deep import GoalAgent, Goal
agent = GoalAgent(
llm=llm,
goal=Goal(
objective="Compare Python async libraries",
success_criteria=["Covers asyncio, trio, and curio",
"Includes pros and cons for each",
"Provides a recommendation",],
max_steps=5,
),
)
result = agent.run()
print(result.goal_status) # "completed" | "partial" | "failed"
print(result.criteria_met) # [True, True, True]
print(result.steps_taken) # 3With built-in tools + MCP (full power)
agent = GoalAgent.with_builtins(
llm=llm,
mcps=[github_mcp, figma_mcp], # attach any MCP servers
goal=Goal(
objective="Research the top 3 Python web frameworks and create a comparison",
success_criteria=["Covers Django, Flask, and FastAPI",
"Includes performance benchmarks",
"Provides a recommendation based on use case",],
max_steps=8,
),
)
result = agent.run()
# Agent searched the web, analyzed results, wrote comparisonStreaming — watch every step live
for event in agent.stream():
if event.type == "run_started":
print(f"GOAL: {event.payload['objective']}")
elif event.type == "planning_completed":
print(f"PLAN: {event.payload['subtasks']}")
elif event.type == "step_started":
print(f" >> {event.message}")
elif event.type == "tool_called":
print(f" TOOL: {event.message}")
elif event.type == "tool_completed":
criteria = event.payload.get("criteria_met")
if criteria:
print(f" EVAL: {criteria}")
elif event.type == "run_completed":
print(f" STATUS: {event.payload.get('status')}")Inspecting results
result = agent.run()
# Step-by-step progress
for s in result.step_outputs:
print(f"Step {s['step']}: {s['task'][:60]}...")
print(f" Output: {s['output'][:100]}...")
# Serialize
print(result.to_dict())Reflective Agent — Self-Improvement Loop
ReflectiveAgent produces output, critically reflects on it, and revises until quality meets the threshold. The agent becomes its own reviewer.
graph TD
A[Start] --> B[Draft Output]
B --> C[Reflect & Score]
C --> D{Quality >= Threshold?}
D -- No --> E[Revise Output]
E --> C
D -- Yes --> F[Final Output]Basic usage
from shipit_agent.deep import ReflectiveAgent
agent = ReflectiveAgent(
llm=llm,
task="Write a high-performance sorting algorithm",
rubric=["Must be >O(n log n)", "Must use strict typing"]
)
result = agent.run()
print(result.final_quality) # 0.92
print(result.iterations) # 2
print(len(result.revisions)) # 3 (initial + 2 revisions)With built-in tools — research and verify
agent = ReflectiveAgent.with_builtins(
llm=llm,
mcps=[my_mcp_server],
reflection_prompt="Verify facts by searching the web. Check accuracy and completeness.",
max_reflections=3,
quality_threshold=0.85,
)
result = agent.run("Write a guide on deploying FastAPI to AWS Lambda")
# Agent writes -> reflects -> searches web to verify -> revisesStreaming reflections
for event in agent.stream("Write a technical guide"):
if event.type == "reasoning_started":
print("REFLECTING...")
elif event.type == "reasoning_completed":
print(f" Quality: {event.payload['quality']:.2f}")
print(f" Feedback: {event.payload['feedback'][:100]}")
elif event.type == "run_completed":
print(f" FINAL QUALITY: {event.payload.get('quality')}")Inspecting reflections
result = agent.run("Explain transformers")
for i, r in enumerate(result.reflections, 1):
print(f"Reflection {i}: quality={r.quality_score:.2f}")
print(f" Feedback: {r.feedback[:100]}")
print(f" Needs revision: {r.revision_needed}")
# Each revision
for i, rev in enumerate(result.revisions):
print(f"Version {i + 1}: {rev[:100]}...")Adaptive Agent — Runtime Tool Creation
AdaptiveAgent can create new tools at runtime when it needs capabilities it doesn't have. Code is auto-dedented so it works cleanly in notebooks.
Creating tools dynamically
from shipit_agent.deep import AdaptiveAgent
from shipit_agent.tools.base import ToolContext
agent = AdaptiveAgent(llm=llm, can_create_tools=True)
# Create a Fibonacci tool
fib = agent.create_tool(
name="fibonacci",
description="Calculate the Nth Fibonacci number",
code="""
def fibonacci(n: int) -> str:
if n <= 1:
return str(n)
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return str(b)
""",
)
# Test it directly
print(fib.run(ToolContext(prompt="test"), n=10).text) # "55"
# Create a temperature converter
agent.create_tool(
name="celsius_to_fahrenheit",
description="Convert Celsius to Fahrenheit",
code="""
def celsius_to_fahrenheit(celsius: float) -> str:
return f"{celsius}C = {celsius * 9/5 + 32:.1f}F"
""",
)
# All created tools are available for agent runs
print(agent.created_tools) # [CreatedTool(name="fibonacci"), ...]
print(len(agent.tools)) # 2
result = agent.run("What is fibonacci of 20?")With built-in tools
# Full power — web search + code exec + dynamic tool creation
agent = AdaptiveAgent.with_builtins(llm=llm, can_create_tools=True)
agent.create_tool("csv_parser", "Parse CSV", "def csv_parser(path: str) -> str: ...")
result = agent.run("Search for CSV parsing best practices, then analyze data.csv")Streaming
for event in agent.stream("Analyze the dataset"):
print(f"[{event.type}] {event.message}")Supervisor — Hierarchical Agent Management
Supervisor plans work, delegates to specialized workers, reviews quality, and can send work back for revision. More powerful than AgentTeam — active quality monitoring.
Quick setup with built-in tools
from shipit_agent.deep import Supervisor, Worker
supervisor = Supervisor.with_builtins(
llm=llm,
worker_configs=[{
"name": "trend-researcher",
"prompt": "You are a trend researcher. Search the web for trends. Return bullet points.",
"capabilities": ["web search", "research"],
},
{
"name": "content-writer",
"prompt": "You write viral social media posts with emojis and hashtags.",
"capabilities": ["writing", "social media"],
},
{
"name": "reviewer",
"prompt": "You review content for engagement and brand safety. Rate 1-10.",
"capabilities": ["review", "quality"],
},],
mcps=[my_mcp_server], # all workers get MCP access
max_delegations=6,
)
result = supervisor.run("Create 3 viral posts about AI agents for developers")Manual worker setup (more control)
from shipit_agent import Agent
analyst = Worker(
name="data-analyst",
agent=Agent.with_builtins(llm=llm, prompt="You are a data analyst."),
capabilities=["data", "statistics", "pandas"],
)
writer = Worker(
name="report-writer",
agent=Agent(llm=llm, prompt="You write executive summaries."),
capabilities=["writing", "reports"],
)
supervisor = Supervisor(llm=llm, workers=[analyst, writer], max_delegations=5)
result = supervisor.run("Analyze AI adoption trends and write a summary")Inspecting delegations
result = supervisor.run("Build a report")
for d in result.delegations:
print(f"Round {d.round}: [{d.worker}]")
print(f" Task: {d.task[:80]}...")
print(f" Output: {d.output[:120]}...")
print(f" Approved: {d.approved}")
print(f"Total rounds: {result.total_rounds}")
print(result.to_dict()) # serializeStreaming — watch delegation in real time
for event in supervisor.stream("Create a marketing report"):
worker = event.payload.get("worker", "supervisor")
if event.type == "planning_started":
print(f"[supervisor] Planning next step...")
elif event.type == "tool_called":
print(f"[supervisor] Delegating to {worker}: {event.message}")
elif event.type == "tool_completed":
print(f"[{worker}] Done: {event.payload.get('output', '')[:80]}")
elif event.type == "run_completed":
print(f"[supervisor] Finished in {event.payload.get('rounds')} rounds")Persistent Agent — Checkpoint & Resume
PersistentAgent saves progress periodically so long-running tasks survive crashes, timeouts, or user interruptions.
Basic usage
from shipit_agent.deep import PersistentAgent
agent = PersistentAgent(
llm=llm,
tools=[web_search, code_exec],
checkpoint_dir="./checkpoints",
checkpoint_interval=5, # save every 5 steps
max_steps=50,
)
# Start a long task
result = agent.run("Write a 50-page report on AI regulations", agent_id="ai-report")
# If interrupted, resume exactly where you left off
result = agent.resume(agent_id="ai-report")Check progress without resuming
status = agent.status("ai-report")
# {"state": "paused", "agent_id": "ai-report", "steps_done": 20, "outputs_count": 20}
# Or "not_found" if no checkpoint exists
status = agent.status("nonexistent")
# {"state": "not_found"}Agent Communication — Typed Channels
Channel enables structured, typed message passing between agents. Every message has a sender, receiver, type, structured data, and optional acknowledgment.
Full pipeline example
from shipit_agent.deep import Channel, AgentMessage
from shipit_agent import Agent
channel = Channel(name="content-pipeline")
# --- Step 1: Researcher gathers info ---
researcher = Agent.with_builtins(llm=llm, prompt="You are a researcher.")
research = researcher.run("Find 5 key facts about WebAssembly adoption")
channel.send(AgentMessage(
from_agent="researcher",
to_agent="writer",
type="research_complete",
data={
"findings": research.output,
"word_count": len(research.output.split()),
"confidence": 0.92,
},
requires_ack=True,
))
# --- Step 2: Writer receives, acknowledges, writes ---
msg = channel.receive(agent="writer")
print(f"From: {msg.from_agent}, Type: {msg.type}")
print(f"Confidence: {msg.data['confidence']}")
channel.ack(msg) # acknowledge receipt
writer = Agent(llm=llm, prompt="You are a technical writer.")
draft = writer.run(f"Write a summary using: {msg.data['findings']}")
channel.send(AgentMessage(
from_agent="writer",
to_agent="reviewer",
type="draft_ready",
data={"draft": draft.output},
))
# --- Step 3: Reviewer receives and reviews ---
review_msg = channel.receive(agent="reviewer")
reviewer = Agent(llm=llm, prompt="You are an editor. Rate 1-10.")
review = reviewer.run(f"Review: {review_msg.data['draft']}")
channel.send(AgentMessage(
from_agent="reviewer",
to_agent="writer",
type="review_complete",
data={"review": review.output, "status": "approved"},
))
# --- Inspect channel ---
print(f"Messages sent: {len(channel.history())}")
print(f"Pending for writer: {channel.pending(agent='writer')}")
for msg in channel.history():
print(f" {msg.from_agent} -> {msg.to_agent} [{msg.type}] ack={msg.acknowledged}")Multi-agent queues
channel = Channel()
# Send to different agents
channel.send(AgentMessage(from_agent="manager", to_agent="dev1", type="task", data={"work": "build API"}))
channel.send(AgentMessage(from_agent="manager", to_agent="dev2", type="task", data={"work": "write tests"}))
channel.send(AgentMessage(from_agent="manager", to_agent="dev1", type="task", data={"work": "fix bug"}))
# Each agent gets their own queue
print(channel.pending(agent="dev1")) # 2
print(channel.pending(agent="dev2")) # 1
# Messages are FIFO per agent
msg1 = channel.receive(agent="dev1") # "build API"
msg2 = channel.receive(agent="dev1") # "fix bug"Message serialization
msg = AgentMessage(from_agent="a", to_agent="b", type="data", data={"key": "value"})
print(msg.to_dict())
# {"from": "a", "to": "b", "type": "data", "data": {"key": "value"}, ...}Agent Benchmarking — Systematic Testing
AgentBenchmark tests your agents systematically with expected outputs, tool usage, and content checks.
Define test cases
from shipit_agent.deep import AgentBenchmark, TestCase
benchmark = AgentBenchmark(
name="Programming Knowledge Eval",
cases=[TestCase(
input="What is Python's GIL?",
expected_contains=["global interpreter lock", "thread"],
),
TestCase(
input="Explain REST APIs",
expected_contains=["http", "resource"],
expected_not_contains=["graphql"],
),
TestCase(
input="What are Python decorators?",
expected_contains=["function", "wrap"],
),
TestCase(
input="What is Docker?",
expected_contains=["container"],
expected_tools=["web_search"], # check tool usage
),],
)Run and get report
agent = Agent.with_builtins(llm=llm)
report = benchmark.run(agent)
print(report.summary())
# Agent Benchmark: Programming Knowledge Eval
# Cases: 4 passed, 0 failed (4 total)
# Pass rate: 100%
# Avg iterations: 1.5
# Tools used: web_search(2)
# [PASS] What is Python's GIL?
# [PASS] Explain REST APIs
# [PASS] What are Python decorators?
# [PASS] What is Docker?Inspect failures
for r in report.results:
if not r.passed:
print(f"FAILED: {r.test_case.input}")
for f in r.failures:
print(f" - {f}")
print(f" Output: {r.output[:200]}")Export for dashboards
import json
print(json.dumps(report.to_dict(), indent=2))
# {"name": "...", "passed": 4, "failed": 0, "pass_rate": 1.0, "results": [...]}Combining Deep Agents
Pipeline + Supervisor
from shipit_agent import Pipeline, step
supervisor = Supervisor.with_builtins(llm=llm, worker_configs=[...])
pipe = Pipeline.sequential(
step("research", agent=Agent.with_builtins(llm=llm), prompt="Research {topic}"),
step("produce", fn=lambda text: supervisor.run(f"Create content from: {text}").output),
)
result = pipe.run(topic="AI agents")GoalAgent + Channel
channel = Channel()
agent = GoalAgent.with_builtins(llm=llm, goal=goal)
for event in agent.stream():
# Forward events through a channel
channel.send(AgentMessage(
from_agent="goal_agent", to_agent="monitor",
type=event.type, data=event.payload,
))ReflectiveAgent + Benchmark
agent = ReflectiveAgent.with_builtins(llm=llm, quality_threshold=0.9)
# Benchmark the reflective agent
benchmark = AgentBenchmark(name="quality-check", cases=[TestCase(input="Explain quantum computing", expected_contains=["qubit", "superposition"]),])
report = benchmark.run(agent)Deep Agents with Memory
All deep agents accept a memory parameter. Pass an AgentMemory instance to give agents persistent conversation history, semantic knowledge, and entity tracking across multiple runs.
GoalAgent with memory — remembers previous goals
from shipit_agent import AgentMemory, ConversationMemory, SemanticMemory, EntityMemory, Entity, InMemoryVectorStore
# Create shared memory
memory = AgentMemory(
conversation=ConversationMemory(strategy="buffer"),
knowledge=SemanticMemory(vector_store=InMemoryVectorStore(), embedding_fn=embed),
entities=EntityMemory(),
)
# Run 1: Research frameworks
agent1 = GoalAgent(
llm=llm,
memory=memory,
goal=Goal(objective="List top 3 Python web frameworks", success_criteria=["Names Django, Flask, FastAPI"]),
)
result1 = agent1.run()
# Store findings in memory
memory.add_fact(f"Previous research: {result1.output[:500]}")
memory.add_entity(Entity(name="Django", entity_type="framework", context="Python web framework"))
memory.add_entity(Entity(name="FastAPI", entity_type="framework", context="Modern async framework"))
# Run 2: Agent remembers Run 1!
agent2 = GoalAgent(
llm=llm,
memory=memory, # same memory — agent sees previous conversation
goal=Goal(objective="Compare the performance of the frameworks you found earlier", success_criteria=["References Django, Flask, FastAPI"]),
)
result2 = agent2.run()
# Agent has context from Run 1 and can reference previous findingsReflectiveAgent with memory — learns from reflections
memory = AgentMemory.default(llm=llm, embedding_fn=embed)
reflective = ReflectiveAgent(
llm=llm,
memory=memory,
reflection_prompt="Check accuracy and completeness.",
quality_threshold=0.8,
)
# Run 1: Explain REST
result1 = reflective.run("Explain what a REST API is")
memory.add_fact(f"REST explanation quality: {result1.final_quality:.2f}")
# Run 2: Compare with GraphQL — agent remembers the REST explanation
result2 = reflective.run("Now explain GraphQL and compare it to REST")
# Agent has full REST context from memorySupervisor with shared memory — workers share knowledge
# Pre-load business data
team_memory = AgentMemory.default(embedding_fn=embed)
team_memory.add_fact("Q4 2025 revenue: $2.4M, up 15% YoY")
team_memory.add_fact("Customer satisfaction: 92%")
team_memory.add_entity(Entity(name="Product Atlas", entity_type="product", context="Main SaaS platform"))
# Workers share memory via history
analyst = Worker(
name="analyst",
agent=Agent(llm=llm, prompt="You are a data analyst.", history=team_memory.get_conversation_messages()),
)
writer = Worker(
name="writer",
agent=Agent(llm=llm, prompt="You write reports.", history=team_memory.get_conversation_messages()),
)
supervisor = Supervisor(llm=llm, workers=[analyst, writer])
result = supervisor.run("Write a Q4 executive summary using the data we have")How memory flows through deep agents
AgentMemory
├── ConversationMemory
│ └── Messages from all previous runs are injected as agent history
├── SemanticMemory
│ └── Facts stored/searched by embedding similarity
└── EntityMemory
└── People, projects, concepts tracked across sessions
GoalAgent(memory=memory)
└── _build_agent() injects memory.get_conversation_messages() as history
└── _save_to_memory() stores each step's output
└── Result: agent sees full conversation context from prior runs
ReflectiveAgent(memory=memory)
└── Same pattern — history injected, outputs saved
└── Store reflection feedback as facts for future runs
Supervisor(workers=[...])
└── Each worker gets history= from shared memory
└── Workers can access pre-loaded facts and entitiesNotebook
See notebooks/18_deep_agents_with_memory.ipynb for full runnable examples with Bedrock.