Skip to main content
The Python SDK provides async-first access to all AgentFlow APIs.

Installation

pip install agentflow

Initialize

import agentflow as af

client = af.Client(endpoint="http://localhost:8000")

Agents

Chat

response = await af.chat("What can you do?", agent="MainAgent")
print(response)

Stream

async for event in af.chat_stream("Hello", agent="MainAgent"):
    if event.type == "DELTA":
        print(event.delta, end="", flush=True)

Run with conversation

response = await af.chat(
    "Follow up on the last topic",
    agent="MainAgent",
    conversation_id="conv_abc123",
)

Knowledge Bases

results = await af.KnowledgeBase.search(
    "How does billing work?",
    kb_id="kb_123",
    top_k=5,
)
for result in results:
    print(result.content, result.score)

Conversations

List

conversations = await af.Conversation.list()

Get messages

messages = await af.Conversation.messages("conv_abc123")

System

Health check

status = await af.System.health()
print(status)