Skip to main content

Batch

The Batch class supports two modes of bulk processing: sync completions for immediate results on small-to-medium datasets, and async batches for large-scale background jobs.

Sync completions

Sync methods process items in-memory and return results directly. Use these for datasets that fit in a single request.

Labeling

import agentflow as af

items = [
    "The product is amazing, love the new features",
    "Terrible support experience, waited 3 hours",
    "It's okay, nothing special",
]

results = await af.Batch.label(items, categories=["positive", "negative", "neutral"])
# [{"label": "positive", "confidence": 0.95}, ...]

Extracting

schema = {
    "company": "string",
    "revenue": "number",
    "year": "integer",
}

results = await af.Batch.extract(items, schema=schema)
# [{"company": "Acme", "revenue": 1500000.0, "year": 2024}, ...]

Summarizing

results = await af.Batch.summarize(items)
# [{"summary": "Customer praises new features..."}, ...]

Enriching

results = await af.Batch.enrich(items, fields=["sentiment", "language", "topic"])
# [{"sentiment": "positive", "language": "en", "topic": "product"}, ...]

Generic operation

results = await af.Batch.run(
    items,
    operation="classify",
    instructions="Classify each item by department: sales, support, engineering",
)

Async batches

For large datasets, submit an async batch job and poll for results.

Submitting a batch

batch = await af.Batch.submit(
    file_id="file_abc123",
    operation="label",
    categories=["positive", "negative", "neutral"],
    webhook_url="https://your-app.com/webhooks/batch-complete",
)

print(batch.id)      # "batch_xyz789"
print(batch.status)  # "queued"

Monitoring progress

# Refresh status from the server
await batch.refresh()

print(batch.status)    # "processing"
print(batch.progress)  # 0.45

Retrieving output

# Wait until complete, then get results
output = await batch.get_output(format="jsonl")
print(output[:200])

# Or as parsed records
output = await batch.get_output(format="records")
for record in output:
    print(record)

Canceling a batch

await batch.cancel()

Properties

PropertyTypeDescription
idstrBatch UUID
statusstr"queued", "processing", "completed", "failed", or "cancelled"
progressfloatCompletion percentage (0.0 to 1.0)
created_atdatetimeWhen the batch was submitted
completed_at`datetimeNone`When the batch finished
error`strNone`Error message if the batch failed