Observability exports

Stream AgentEvent traces out of shipit-agent into LangSmith or any OpenTelemetry backend. Batching, attribute mapping, and silent-failure semantics explained.

4 min read
16 sections
Edit this page

Every agent, autopilot, and crew in shipit-agent already emits a stream of AgentEvent records into the configured TraceStore. Two ready-made exporters let you forward that same stream to an external observability backend without touching your agent code:

ExporterClassBackendTransport
LangSmithLangSmithExporterLangSmith runs APIurllib POST, batched
OpenTelemetryOpenTelemetryExporterAny OTLP collector / Jaeger / Tempo / Honeycomb / Datadogopentelemetry-api / opentelemetry-sdk

Both implement the same TraceStore protocol (append_event, load) so they slot into the runtime identically and can be layered behind a fan-out store if you want both.

Choosing an exporter

Pick LangSmithExporter when:

  • Your team already runs LangSmith for LLM eval / replay and you want shipit traces showing up in the same project.
  • You want zero extra dependencies — the exporter uses stdlib urllib.request, no LangSmith SDK required.
  • You're fine with a 20-span / 2-second batched POST to the public API.

Pick OpenTelemetryExporter when:

  • You already pipe OTLP to Jaeger, Tempo, Honeycomb, Datadog, New Relic, or a self-hosted collector.
  • You want span attributes, correlated trace IDs, and multi-service stitching (e.g. agent span → downstream HTTP client span).
  • You're happy to install opentelemetry-api + opentelemetry-sdk and wire up your own processor / exporter pair.

Both exporters never raise from append_event. Transport errors are logged at WARNING via the standard logging module and the event is dropped — tracing must not be allowed to take a production agent down.


LangSmith

Translates each AgentEvent into a LangSmith run record and POSTs batches to ${api_url}/runs.

Setup

bash
export LANGCHAIN_API_KEY=lsv2_pt_xxxxxxxxxxxxxxxxxxxx
python
from shipit_agent.tracing_exporters.langsmith_exporter import LangSmithExporter

exporter = LangSmithExporter(
    api_key=None,                                 # defaults to LANGCHAIN_API_KEY env
    api_url="https://api.smith.langchain.com",    # override for self-hosted
    project="shipit-agent",
    timeout_seconds=5.0,
    batch_size=20,                                # flush after 20 events
    flush_interval_seconds=2.0,                   # or 2 seconds, whichever first
)

Example — wire into an agent

python
from shipit_agent import Agent
from shipit_agent.llms import OpenAIChatLLM
from shipit_agent.tracing_exporters.langsmith_exporter import LangSmithExporter

exporter = LangSmithExporter(project="shipit-agent/prod")

agent = Agent(
    llm=OpenAIChatLLM(model="gpt-4o-mini"),
    trace_store=exporter,
)
agent.run("Why is the sky blue?")
exporter.flush()       # force the last partial batch out before shutdown

Batching behaviour

A batch is flushed when either of these fires, whichever comes first:

  • The buffer reaches batch_size events (default 20).
  • flush_interval_seconds have elapsed since the first event in the current buffer (default 2.0 s) — measured against time.monotonic(), not wall clock.

Call exporter.flush() explicitly at shutdown; the exporter has no background thread, so a small tail of events can linger in the buffer until the next append_event triggers a time-based flush.

If api_key is None (env var also missing), flush() logs a WARNING with the dropped count and continues — the agent keeps running.

Attribute mapping

Each AgentEvent becomes:

python
{
  "name": event.type,
  "run_type": "chain",
  "inputs":  {"message": event.message},
  "outputs": <event.payload, coerced to JSON-plain>,
  "start_time": <now UTC ISO8601>,
  "end_time":   <now UTC ISO8601>,      # same as start — events are point-in-time
  "extra": {
    "trace_id": <trace_id from append_event>,
    "project":  <exporter.project>,
    "metadata": <append_event metadata arg, JSON-plain>,
  },
}

Dataclasses are converted with asdict, tuples become lists, and anything that isn't str/int/float/bool/None falls back to str(value) so no event shape can break the POST.

Transport failures

urllib.error.URLError, OSError, and TimeoutError on the POST are caught and logged at WARNING as "LangSmithExporter failed to post N run(s) to <url>: <exc>". The batch is dropped — no retry queue. If you need durable guarantees, layer the exporter behind a persistent TraceStore and treat LangSmith as the best-effort mirror.


OpenTelemetry

Emits each AgentEvent as an OTel span on a provided (or default) tracer provider.

Setup

bash
pip install opentelemetry-api opentelemetry-sdk
# plus whatever exporter you actually use, e.g.
pip install opentelemetry-exporter-otlp

The opentelemetry package is imported lazily inside __init__ — it's required at instantiation time, not at module import. If the packages aren't installed, the exporter raises RuntimeError("Install opentelemetry-api and opentelemetry-sdk.").

python
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

provider = TracerProvider(resource=Resource.create({"service.name": "shipit-agent"}))
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(
    endpoint="https://otlp.example.com/v1/traces",
)))
trace.set_tracer_provider(provider)

from shipit_agent.tracing_exporters.otel_exporter import OpenTelemetryExporter

otel = OpenTelemetryExporter(
    service_name="shipit-agent",
    tracer_provider=provider,       # optional — falls back to the global one
)

Example — wire into an agent

python
from shipit_agent import Agent
from shipit_agent.llms import OpenAIChatLLM

agent = Agent(
    llm=OpenAIChatLLM(model="gpt-4o-mini"),
    trace_store=otel,
)
agent.run("Summarise the latest release notes.")
# spans flow into your OTLP collector via the BatchSpanProcessor

Span lifecycle

Because the AgentEvent handed to append_event is already finalised, each span is started and ended inside the same call — there are no context-manager handoffs. That means span duration is effectively zero; these are event-style spans, not duration spans. If you need duration correlation, drive it from your downstream HTTP client spans (which OTel propagates via the trace context).

Attribute mapping

Span name: "agent.<event.type>".

Attributes set on every span:

AttributeSource
shipit.trace_idtrace_id arg to append_event
shipit.event.typeevent.type
shipit.event.messageevent.message
shipit.payload.<key>each entry in event.payload (except events)
shipit.metadata.<key>each entry in the metadata arg

If event.payload["events"] is a list of dicts, each is added as a nested span event via span.add_event(name, attributes=...). name is taken from sub["name"] or sub["type"] (falling back to "event"), and the remaining keys are namespaced as shipit.event.<k>.

Attribute coercion

OTel only accepts str / bool / int / float (or homogeneous sequences of same) as attribute values. Anything else is coerced:

  • Lists / tuples of primitives → passed through as a list.
  • Mixed-type lists → [str(v) for v in value].
  • Anything else → repr(value).

This means no payload shape can make append_event raise — worst case you get a stringified attribute.

Transport failures

OpenTelemetry transport errors are handled by your configured span processor / exporter, not by this class. If the OTLP endpoint is down, the SDK's BatchSpanProcessor queues and retries per its own policy; shipit-agent is unaware. Configure the processor's queue / timeout / retry knobs for your durability requirements.


Layering both

If you want LangSmith for human eval and OTel for infra correlation at the same time, wrap both behind a fan-out store:

python
class FanOutStore:
    def __init__(self, *stores):
        self._stores = stores
    def append_event(self, trace_id, event, metadata=None):
        for s in self._stores:
            s.append_event(trace_id, event, metadata)
    def load(self, trace_id):
        return None

agent = Agent(llm=..., trace_store=FanOutStore(langsmith, otel))