Streaming

Real-time event streaming from shipit_agent.Agent — event types, terminal renderers, SSE and WebSocket transports.

2 min read
8 sections
Edit this page

agent.stream(prompt) is a generator that yields AgentEvents the instant they're emitted. There is no buffering: each tool_called event arrives before the tool runs, each tool_completed arrives the moment the tool returns, and run_completed is the very last event.

Event types

TypeEmitted whenUseful payload fields
run_startedThe agent receives a user promptprompt
step_startedThe runtime begins an LLM iterationiteration, tool_count
reasoning_startedThe model surfaces a thinking blockiteration
reasoning_completedThe thinking block is finalisedcontent
planning_startedAuto-planner is invoked
planning_completedPlanner output is readyplan
tool_calledA tool is about to runtool_name, arguments
tool_completedA tool returned successfullytool_name, metadata, output
tool_failedA tool raisedtool_name, error
interactive_requestThe agent needs the human to answerquestion, options
mcp_attachedAn MCP server has been wired inserver_name
llm_retryThe LLM call is being retriedattempt, error
tool_retryA tool call is being retriedattempt, error
context_snapshotToken usage updateusage, compaction_ratio
rag_sourcesRAG sources captured during the runsources
run_completedThe run is overoutput, iterations

See the Event Types reference for the complete schema.


Minimal example

python
for event in agent.stream("Search the web for SQLite news"):
    print(f"[{event.type}] {event.message}")

Coloured terminal renderer

python
RESET = "\033[0m"
DIM   = "\033[2m"
BOLD  = "\033[1m"
CYAN  = "\033[36m"
GREEN = "\033[32m"
YELL  = "\033[33m"

for event in agent.stream("Find today's BTC price"):
    if event.type == "run_started":
        print(BOLD + "🚀 run started" + RESET)
    elif event.type == "step_started":
        print(DIM + f"  · iter {event.payload.get('iteration')}" + RESET)
    elif event.type == "reasoning_started":
        print(YELL + "  🧠 thinking…" + RESET)
    elif event.type == "reasoning_completed":
        print(YELL + "  🧠 " + event.payload.get('content', '')[:80] + RESET)
    elif event.type == "tool_called":
        print(CYAN + "  ▶ " + event.message + RESET)
    elif event.type == "tool_completed":
        print(GREEN + "  ✓ " + event.message + RESET)
    elif event.type == "rag_sources":
        for s in event.payload.get("sources", []):
            print(DIM + f"    📎 [{s['index']}] {s['source']}" + RESET)
    elif event.type == "run_completed":
        print(BOLD + "✅ done" + RESET)
        print((event.payload.get('output') or '')[:300])

examples/02_streaming_with_reasoning.py ships a more polished version of this you can copy verbatim.


Server-Sent Events (SSE)

For web UIs, every event has a built-in SSE encoder (shipit_agent.packets.sse_event_packet):

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

from shipit_agent import Agent
from shipit_agent.packets import sse_event_packet, sse_result_packet

app = FastAPI()
agent = Agent.with_builtins(llm=llm)

@app.get("/stream")
async def stream(q: str):
    def gen():
        for event in agent.stream(q):
            yield sse_event_packet(event)
        # Final marker — useful for clients that watch for `event: done`
        yield "event: done\ndata: {}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

The browser side reads it with EventSource("/stream?q=…") and renders events as they arrive.


WebSocket

python
from shipit_agent.packets import websocket_event_packet

@app.websocket("/ws")
async def ws(websocket):
    await websocket.accept()
    user_msg = await websocket.receive_text()
    for event in agent.stream(user_msg):
        await websocket.send_json(websocket_event_packet(event))

websocket_event_packet returns a JSON-friendly dict; pair it with send_json for a clean transport.


Streaming inside a chat session

AgentChatSession.stream mirrors Agent.stream but also persists each turn to the session store:

python
session = agent.chat_session(session_id="user-42")

for event in session.stream("Hi, what can you do?"):
    print(event.message)

# Next turn — same session, same history
for event in session.stream("Search the web for SQLite news"):
    print(event.message)

Subscribe to events programmatically with session.add_event_callback or session.add_packet_callback if you want a callback API instead of a generator.


Stopping a stream

A for event in agent.stream(...): loop can be exited with break — the runtime cleans up the background thread automatically. For explicit cancellation from another thread, raise StopIteration or close the generator (stream.close()).


See also