Skip to main content

Streaming & Real-time Events

AgentFlow streams agent execution over Server-Sent Events (SSE). The live chat stream is a real-time view of the run; durable conversation state is written as the run progresses so clients can reconnect, recover UI state, and avoid duplicate submissions after navigation or network loss.

Event format

REST streaming uses the wire event types emitted by the backend: start, delta, end, error, approvals, questions, refinement events, and artifact lifecycle events. Text chunks are delta events. Final results are end events. There is no content event type; content is a field on each event payload.
{
  "type": "delta",
  "call_id": "call_abc123",
  "parent_call_id": null,
  "root_call_id": "call_abc123",
  "content": "Here are the results...",
  "timestamp": "2026-04-28T14:30:01Z",
  "seq": 5,
  "metadata": {
    "display_name": "MainAgent",
    "content_type": "markdown"
  }
}

Key fields

FieldPurpose
typeWire event type
call_idUnique identifier for this execution unit
parent_call_idParent execution ID, such as the agent that invoked a tool or sub-agent
root_call_idRoot execution ID for this event tree. For chat this is usually the root agent run; direct tool or maintenance streams can have their own root.
contentEvent payload. For text delta events this is a string; for start, end, approval, question, and error events this is usually an object.
seqMonotonically increasing conversation sequence for deterministic ordering
timestampServer timestamp
metadataDisplay hints and content details, such as content_type, display_name, icon data, and reasoning metadata
seq is the application ordering key inside the JSON payload. AgentFlow also emits SSE transport id: lines when an event has event_id, sse_id, or seq; use the transport id for Last-Event-ID reconnect hints.

Call hierarchy

root_call_id stays constant for one execution tree. parent_call_id points to the execution unit that caused the current event:
call_agent_root      parent=null            root=call_agent_root
call_records_agent   parent=call_agent_root root=call_agent_root
call_search_records  parent=call_records_agent root=call_agent_root
Direct tool and direct sub-agent runs can be roots of their own event trees. Delegated sub-agent runs are children of the parent agent call. Use call_id for cancellation and per-execution display; use root_call_id to correlate the whole run in logs, timelines, and traces.

Event types

TypeWhen emitted
startAn agent, sub-agent, or tool begins execution
deltaIncremental streamed output
endExecution unit completed; root end contains the final result and metrics
errorExecution failed or the SSE layer encountered an error
approval_requiredA tool with approval gating is waiting for human review
approval_approvedPending approval was approved
approval_deniedPending approval was denied
approval_timeoutPending approval expired
approval_escalatedApproval was escalated
approval_bypassedApproval was bypassed
question_requiredAgent paused to ask the user for clarification
question_answeredUser answered the question
question_timeoutQuestion expired without a response
refinementReflection rejected a draft response and the agent is refining its answer
artifact_startedArtifact generation or update began
artifact_progressArtifact generation made incremental progress
artifact_completedArtifact payload was persisted and is ready to fetch
artifact_errorArtifact generation failed
Reasoning summaries stream as delta events with metadata.content_type: "reasoning". Visible assistant text streams as delta events with a text or markdown content type. verbose=true includes nested tool and sub-agent lifecycle events. verbose=false suppresses nested execution chatter from the live stream, but root lifecycle events, user-interaction events such as approvals and questions, and explicit live progress/result deltas still surface. Persisted conversation timelines can still show tool/sub-agent containers after the fact.

Live progress and result deltas

Long-running tools and artifacts can stream structured delta events while they run. AgentFlow uses four exact live content types:
Content typeSDK eventPurpose
tool_progressToolProgressWhat a running tool is doing
tool_result_deltaToolResultDeltaPartial user-facing output from a running tool
artifact_progressArtifactProgressWhat artifact generation is doing
artifact_result_deltaArtifactResultDeltaPartial user-facing artifact output
Progress is operational status. Result deltas are partial output and may be replaced or collapsed once the final result arrives. Clients should classify these events from metadata.content_type or the mirrored content.type value; UI hints such as show_in_panel do not define whether something is streamable.
{
  "type": "delta",
  "call_id": "tool_call_123",
  "content": {
    "type": "tool_result_delta",
    "result_type": "vapi_call_transcript",
    "transcript": [{ "role": "assistant", "text": "Hello, this is Alex." }]
  },
  "metadata": {
    "content_type": "tool_result_delta",
    "display_name": "Make Call"
  }
}
Artifacts are first-class lifecycle events. The backend strips internal artifact markers from visible text deltas and emits structured artifact events instead. Clients should not parse inline XML or marker text from delta chunks.
{
  "type": "artifact_completed",
  "call_id": "call_abc123",
  "content": {
    "artifact_type": "draft_email",
    "artifact_id": "art_abc123",
    "stream_status": "complete",
    "has_renderable_payload": true,
    "renderable_unit_count": 1,
    "renderable_unit_type": "field",
    "conversation_id": "conv_abc123"
  },
  "metadata": {
    "content_type": "json",
    "source_type": "artifact"
  }
}
Use artifact_started to open a placeholder, artifact_progress to update streaming UI, artifact_completed to fetch the persisted artifact by ID, and artifact_error to show a failed artifact state. Fetch full artifact payloads through the artifact APIs; the stream event is a lifecycle notification and compact render hint, not the durable artifact record.

SDK event projection

The Python SDK parses raw wire events into semantic event classes. For example, a wire delta with visible text becomes TextDelta, a reasoning delta becomes ReasoningDelta, and the root end becomes FinalResponse.
from agentflow.events import (
    ArtifactResultDelta,
    ArtifactProgress,
    ArtifactStatus,
    ErrorEvent,
    FinalResponse,
    QuestionRequired,
    RefinementEvent,
    TextDelta,
    ToolProgress,
    ToolResultDelta,
)

async for event in client.agents.stream(agent_id=agent_id, message="Analyze Q3 pipeline"):
    if isinstance(event, TextDelta):
        print(event.text, end="", flush=True)
    elif isinstance(event, QuestionRequired):
        print(event.question)
    elif isinstance(event, RefinementEvent):
        print("Refining answer...")
    elif isinstance(event, ToolProgress):
        print(f"Tool progress: {event.content}")
    elif isinstance(event, ToolResultDelta):
        print(f"Tool result update: {event.content}")
    elif isinstance(event, ArtifactProgress):
        print(f"Artifact progress: {event.status}")
    elif isinstance(event, ArtifactResultDelta):
        print(f"Artifact result update: {event.content}")
    elif isinstance(event, ArtifactStatus) and event.type == "artifact_completed":
        print(f"Artifact ready: {event.artifact_id}")
    elif isinstance(event, ErrorEvent):
        raise RuntimeError(event.message)
    elif isinstance(event, FinalResponse):
        print(event.text)
Every typed SDK event includes the original backend SSE payload on event.raw. When you want unprojected backend dictionaries instead of typed events, pass raw=True or call stream_raw(...).
async for event in client.agents.stream(agent_id=agent_id, message="Analyze Q3 pipeline"):
    print(event.type, event.raw.get("seq"), event.raw.get("content"))
async for event in client.agents.stream(agent_id=agent_id, message="Analyze Q3 pipeline", raw=True):
    print(event["type"], event.get("seq"), event.get("content"))
Direct tool execution exposes the same pattern with client.agents.tools.stream_raw(...).

Consuming REST streams

SSE frames can be split across network chunks, can contain comments, retry: hints, optional id: fields, and multiple data: lines. Parse complete SSE events separated by a blank line, concatenate all data: lines with \n, and ignore comments that start with :.

Python (httpx)

import json
import httpx

async def iter_sse_events(response):
    event = {"data": []}

    async for line in response.aiter_lines():
        if line == "":
            if event["data"]:
                yield {
                    "id": event.get("id"),
                    "event": event.get("event", "message"),
                    "data": "\n".join(event["data"]),
                }
            event = {"data": []}
            continue

        if line.startswith(":"):
            continue

        field, _, value = line.partition(":")
        if value.startswith(" "):
            value = value[1:]

        if field == "data":
            event["data"].append(value)
        elif field in {"event", "id", "retry"}:
            event[field] = value

    if event["data"]:
        yield {
            "id": event.get("id"),
            "event": event.get("event", "message"),
            "data": "\n".join(event["data"]),
        }

async with httpx.AsyncClient() as client:
    async with client.stream(
        "POST",
        f"{base_url}/api/v1/agent/{agent_id}/chat",
        headers={
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        },
        json={
            "message": "Research Acme Corp",
            "conversation_id": "conv_001",
            "message_id": "msg_001",
            "stream": True,
        },
    ) as response:
        async for frame in iter_sse_events(response):
            event = json.loads(frame["data"])
            if event["type"] == "delta" and isinstance(event.get("content"), str):
                print(event["content"], end="", flush=True)

JavaScript (fetch + ReadableStream)

const response = await fetch(`/api/v1/agent/${agentId}/chat`, {
  method: 'POST',
  headers: {
    Authorization: `Bearer ${token}`,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    message: 'Research Acme Corp',
    conversation_id: 'conv_001',
    message_id: 'msg_001',
    stream: true,
  }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let lastEventId = null;

function handleFrame(frame) {
  const parsed = { data: [] };
  for (const line of frame.split(/\r?\n/)) {
    if (!line || line.startsWith(':')) continue;
    const index = line.indexOf(':');
    const field = index === -1 ? line : line.slice(0, index);
    let value = index === -1 ? '' : line.slice(index + 1);
    if (value.startsWith(' ')) value = value.slice(1);

    if (field === 'data') parsed.data.push(value);
    if (field === 'id') parsed.id = value;
    if (field === 'event') parsed.event = value;
  }

  if (parsed.id) lastEventId = parsed.id;
  if (!parsed.data.length) return;

  const event = JSON.parse(parsed.data.join('\n'));
  if (event.type === 'delta' && typeof event.content === 'string') {
    document.getElementById('output').textContent += event.content;
  }
}

while (true) {
  const { done, value } = await reader.read();
  buffer += decoder.decode(value || new Uint8Array(), { stream: !done });

  const frames = buffer.split(/\r?\n\r?\n/);
  buffer = frames.pop() || '';

  for (const frame of frames) {
    handleFrame(frame);
  }

  if (done) {
    if (buffer.trim()) handleFrame(buffer);
    break;
  }
}

Reconnect and resume

Disconnecting from a chat SSE stream only detaches that subscriber from the run. It does not mean the user cancelled the agent. To stop work, call the cancellation API explicitly. For robust UIs, pair chat submission with the durable conversation stream:
GET /api/v1/conversations/{conversation_id}/stream
Authorization: Bearer $TOKEN
The conversation stream emits:
id: conversation:conv_001:seq:12
data: {"type":"conversation_snapshot","conversation_id":"conv_001","content":{"is_active":true,"runs":[...],"messages":[...],"cursor":{"max_seq":12}},"seq":12}

id: conversation:conv_001:seq:14
data: {"type":"conversation_update","conversation_id":"conv_001","content":{"is_active":true,"runs":[...],"messages":[...],"cursor":{"max_seq":14}},"seq":14}

id: conversation:conv_001:seq:18
data: {"type":"conversation_stream_complete","conversation_id":"conv_001","content":{"conversation_id":"conv_001","cursor":{"max_seq":18}},"seq":18}
Use this endpoint after a network interruption, tab navigation, or duplicate-submit 409 to rebuild UI state from persisted messages, artifacts, active runs, and the cursor. Active run snapshots include heartbeat metadata such as last_heartbeat_at; clients can use that to show that detached work is still alive. This recovery stream is snapshot-based. AgentFlow emits stable SSE id: fields for conversation snapshots, such as conversation:{conversation_id}:seq:{max_seq}. Persist the most recent id and send it as Last-Event-ID on reconnect; the server returns a fresh full snapshot with recovery metadata, then continues with changed snapshots while the run is active.

Stable message IDs and idempotency

Generate conversation_id once per conversation and a stable message_id once per user-submitted message. Reuse the same message_id when retrying the same user action after a timeout, refresh, or duplicate submit. Do not mint a fresh message_id for every network retry; that can create duplicate turns. For direct tool runs, use idempotency_key for side-effecting calls. The key prevents duplicate in-flight direct executions; it is not a durable replay cache for completed results. SDK helpers mint IDs when omitted for convenience, but production chat UIs should pass stable IDs from their own message model.

Detached chat streams

For chat, AgentFlow consumes the agent run in a detached server task and relays events to the current SSE subscriber. If the browser tab closes, the relay detaches and the run keeps going. If the subscriber falls behind the bounded relay queue, live delivery is dropped but durable conversation state continues to be written. Clients should treat the original chat SSE stream as a live convenience, not the source of truth. The durable recovery loop is:
  1. Submit the chat request with stable conversation_id and message_id.
  2. Render live events while connected.
  3. On navigation, network loss, queue overflow, or duplicate-submit 409, open /api/v1/conversations/{conversation_id}/stream.
  4. Rebuild from conversation_snapshot and apply conversation_update events until the run completes.

Cancellation

Cancellation is explicit and cooperative. Disconnecting from a chat stream does not cancel the root run.
# List active executions
GET /api/v1/cancel/active-calls
Authorization: Bearer $TOKEN
{
  "success": true,
  "active_calls": ["call_1", "call_2"],
  "count": 2
}
# Cancel a specific execution
POST /api/v1/cancel/{call_id}
Authorization: Bearer $TOKEN
{
  "success": true,
  "message": "Cancellation requested for call call_1",
  "call_id": "call_1",
  "fully_cancelled": false
}
# Cancel by server-side conversation run ID
POST /api/v1/cancel/run/{run_id}
Authorization: Bearer $TOKEN

# Cancel by the client-supplied conversation/message IDs
POST /api/v1/cancel/conversation-message
Authorization: Bearer $TOKEN
{
  "conversation_id": "conv_001",
  "message_id": "msg_001"
}
Use these when the UI has not yet observed the stream’s root_call_id. conversation_runs are created from the client conversation_id and message_id, so either value can be used as an early cancellation handle once the run row exists.
# Cancel all active executions for the current user
POST /api/v1/cancel/all
Authorization: Bearer $TOKEN
{
  "success": true,
  "message": "Cancellation requested for 2 active calls",
  "cancelled_calls": ["call_1", "call_2"],
  "count": 2
}
# Cancel a queued or running background task
POST /api/v1/cancel/background-task/{task_id}
Authorization: Bearer $TOKEN
Queued tasks return status: "cancelled"; running tasks with a cancellation handle return status: "cancellation_requested". The SDK exposes the same contract:
run = conversation.run_handle("Analyze Acme")

async for event in run:
    if should_stop(event):
        await run.cancel()
        break
run.cancel() cancels the root call once the stream has observed its root_call_id. You can also call await client.cancellations.cancel(call_id) directly. SDK hooks can request background cancellation too: returning Block(...) from a pre_tool_use hook calls client.cancellations.cancel(event.call_id) for the in-flight tool, and returning Block(...) from an approval hook denies the pending approval. Cancellation is observed between LLM chunks, tool dispatches, sub-agent turns, and other cooperative checkpoints. A tool or provider call that is already inside a non-interruptible network request may finish before the cancellation signal is observed. Background tasks use their task_id; running foreground executions use call_id.

Observability

AgentFlow tags execution spans with call_id, parent_call_id, agent name, role, tenant, user, and request context where available. Root agent and direct root sub-agent runs create Datadog LLM Observability workflow spans; nested tools and sub-agents create child execution spans and LLMObs annotations. The LLMObs session_id is the conversation_id when available, otherwise the root or current call ID. Use these IDs consistently:
IDUse
conversation_idUser-facing session and Datadog LLMObs session correlation
message_idThe user turn that caused artifacts, tool calls, or errors
root_call_idOne execution tree across root agent, sub-agents, and tools
parent_call_idNesting relationship for timelines and span trees
call_idCancellation, execution metrics, and per-call logs
Datadog LLM Observability is enabled when the server runs with DD_TRACE_ENABLED=true, DD_LLMOBS_ENABLED=true, and a lowercase DD_LLMOBS_ML_APP.

Infrastructure

FeatureDetail
Max connectionsPer-replica SSE connection cap; new streams return HTTP 503 when full
Connection timeoutLong-running streams are bounded by the server’s SSE timeout
Retry hintStreams send an SSE retry: directive before data events
HeartbeatDurable conversation runs expose last_heartbeat_at while background execution is active
Nginx bufferingDisabled with X-Accel-Buffering: no
Disconnect behaviorChat and conversation streams detach on client disconnect; explicit cancel endpoints stop work