Streaming & Real-time Events
AgentFlow streams agent execution over Server-Sent Events (SSE). The live chat stream is a real-time view of the run; durable conversation state is written as the run progresses so clients can reconnect, recover UI state, and avoid duplicate submissions after navigation or network loss.Event format
REST streaming uses the wire event types emitted by the backend:start, delta, end, error, approvals, questions, refinement events, and artifact lifecycle events. Text chunks are delta events. Final results are end events. There is no content event type; content is a field on each event payload.
Key fields
| Field | Purpose |
|---|---|
type | Wire event type |
call_id | Unique identifier for this execution unit |
parent_call_id | Parent execution ID, such as the agent that invoked a tool or sub-agent |
root_call_id | Root execution ID for this event tree. For chat this is usually the root agent run; direct tool or maintenance streams can have their own root. |
content | Event payload. For text delta events this is a string; for start, end, approval, question, and error events this is usually an object. |
seq | Monotonically increasing conversation sequence for deterministic ordering |
timestamp | Server timestamp |
metadata | Display hints and content details, such as content_type, display_name, icon data, and reasoning metadata |
seq is the application ordering key inside the JSON payload. AgentFlow also emits SSE transport id: lines when an event has event_id, sse_id, or seq; use the transport id for Last-Event-ID reconnect hints.
Call hierarchy
root_call_id stays constant for one execution tree. parent_call_id points to the execution unit that caused the current event:
call_id for cancellation and per-execution display; use root_call_id to correlate the whole run in logs, timelines, and traces.
Event types
| Type | When emitted |
|---|---|
start | An agent, sub-agent, or tool begins execution |
delta | Incremental streamed output |
end | Execution unit completed; root end contains the final result and metrics |
error | Execution failed or the SSE layer encountered an error |
approval_required | A tool with approval gating is waiting for human review |
approval_approved | Pending approval was approved |
approval_denied | Pending approval was denied |
approval_timeout | Pending approval expired |
approval_escalated | Approval was escalated |
approval_bypassed | Approval was bypassed |
question_required | Agent paused to ask the user for clarification |
question_answered | User answered the question |
question_timeout | Question expired without a response |
refinement | Reflection rejected a draft response and the agent is refining its answer |
artifact_started | Artifact generation or update began |
artifact_progress | Artifact generation made incremental progress |
artifact_completed | Artifact payload was persisted and is ready to fetch |
artifact_error | Artifact generation failed |
delta events with metadata.content_type: "reasoning". Visible assistant text streams as delta events with a text or markdown content type.
verbose=true includes nested tool and sub-agent lifecycle events. verbose=false suppresses nested execution chatter from the live stream, but root lifecycle events, user-interaction events such as approvals and questions, and explicit live progress/result deltas still surface. Persisted conversation timelines can still show tool/sub-agent containers after the fact.
Live progress and result deltas
Long-running tools and artifacts can stream structureddelta events while they run. AgentFlow uses four exact live content types:
| Content type | SDK event | Purpose |
|---|---|---|
tool_progress | ToolProgress | What a running tool is doing |
tool_result_delta | ToolResultDelta | Partial user-facing output from a running tool |
artifact_progress | ArtifactProgress | What artifact generation is doing |
artifact_result_delta | ArtifactResultDelta | Partial user-facing artifact output |
metadata.content_type or the mirrored content.type value; UI hints such as show_in_panel do not define whether something is streamable.
delta chunks.
artifact_started to open a placeholder, artifact_progress to update streaming UI, artifact_completed to fetch the persisted artifact by ID, and artifact_error to show a failed artifact state. Fetch full artifact payloads through the artifact APIs; the stream event is a lifecycle notification and compact render hint, not the durable artifact record.
SDK event projection
The Python SDK parses raw wire events into semantic event classes. For example, a wiredelta with visible text becomes TextDelta, a reasoning delta becomes ReasoningDelta, and the root end becomes FinalResponse.
event.raw. When you want unprojected backend dictionaries instead of typed events, pass raw=True or call stream_raw(...).
client.agents.tools.stream_raw(...).
Consuming REST streams
SSE frames can be split across network chunks, can contain comments,retry: hints, optional id: fields, and multiple data: lines. Parse complete SSE events separated by a blank line, concatenate all data: lines with \n, and ignore comments that start with :.
Python (httpx)
JavaScript (fetch + ReadableStream)
Reconnect and resume
Disconnecting from a chat SSE stream only detaches that subscriber from the run. It does not mean the user cancelled the agent. To stop work, call the cancellation API explicitly. For robust UIs, pair chat submission with the durable conversation stream:409 to rebuild UI state from persisted messages, artifacts, active runs, and the cursor. Active run snapshots include heartbeat metadata such as last_heartbeat_at; clients can use that to show that detached work is still alive.
This recovery stream is snapshot-based. AgentFlow emits stable SSE id: fields for conversation snapshots, such as conversation:{conversation_id}:seq:{max_seq}. Persist the most recent id and send it as Last-Event-ID on reconnect; the server returns a fresh full snapshot with recovery metadata, then continues with changed snapshots while the run is active.
Stable message IDs and idempotency
Generateconversation_id once per conversation and a stable message_id once per user-submitted message. Reuse the same message_id when retrying the same user action after a timeout, refresh, or duplicate submit. Do not mint a fresh message_id for every network retry; that can create duplicate turns.
For direct tool runs, use idempotency_key for side-effecting calls. The key prevents duplicate in-flight direct executions; it is not a durable replay cache for completed results. SDK helpers mint IDs when omitted for convenience, but production chat UIs should pass stable IDs from their own message model.
Detached chat streams
For chat, AgentFlow consumes the agent run in a detached server task and relays events to the current SSE subscriber. If the browser tab closes, the relay detaches and the run keeps going. If the subscriber falls behind the bounded relay queue, live delivery is dropped but durable conversation state continues to be written. Clients should treat the original chat SSE stream as a live convenience, not the source of truth. The durable recovery loop is:- Submit the chat request with stable
conversation_idandmessage_id. - Render live events while connected.
- On navigation, network loss, queue overflow, or duplicate-submit
409, open/api/v1/conversations/{conversation_id}/stream. - Rebuild from
conversation_snapshotand applyconversation_updateevents until the run completes.
Cancellation
Cancellation is explicit and cooperative. Disconnecting from a chat stream does not cancel the root run.root_call_id. conversation_runs are created from the client conversation_id and message_id, so either value can be used as an early cancellation handle once the run row exists.
status: "cancelled"; running tasks with a cancellation handle return status: "cancellation_requested".
The SDK exposes the same contract:
run.cancel() cancels the root call once the stream has observed its root_call_id. You can also call await client.cancellations.cancel(call_id) directly. SDK hooks can request background cancellation too: returning Block(...) from a pre_tool_use hook calls client.cancellations.cancel(event.call_id) for the in-flight tool, and returning Block(...) from an approval hook denies the pending approval.
Cancellation is observed between LLM chunks, tool dispatches, sub-agent turns, and other cooperative checkpoints. A tool or provider call that is already inside a non-interruptible network request may finish before the cancellation signal is observed. Background tasks use their task_id; running foreground executions use call_id.
Observability
AgentFlow tags execution spans withcall_id, parent_call_id, agent name, role, tenant, user, and request context where available. Root agent and direct root sub-agent runs create Datadog LLM Observability workflow spans; nested tools and sub-agents create child execution spans and LLMObs annotations. The LLMObs session_id is the conversation_id when available, otherwise the root or current call ID.
Use these IDs consistently:
| ID | Use |
|---|---|
conversation_id | User-facing session and Datadog LLMObs session correlation |
message_id | The user turn that caused artifacts, tool calls, or errors |
root_call_id | One execution tree across root agent, sub-agents, and tools |
parent_call_id | Nesting relationship for timelines and span trees |
call_id | Cancellation, execution metrics, and per-call logs |
DD_TRACE_ENABLED=true, DD_LLMOBS_ENABLED=true, and a lowercase DD_LLMOBS_ML_APP.
Infrastructure
| Feature | Detail |
|---|---|
| Max connections | Per-replica SSE connection cap; new streams return HTTP 503 when full |
| Connection timeout | Long-running streams are bounded by the server’s SSE timeout |
| Retry hint | Streams send an SSE retry: directive before data events |
| Heartbeat | Durable conversation runs expose last_heartbeat_at while background execution is active |
| Nginx buffering | Disabled with X-Accel-Buffering: no |
| Disconnect behavior | Chat and conversation streams detach on client disconnect; explicit cancel endpoints stop work |

