Claude API streaming Python: stream responses token-by-token
By default, the Claude API waits until the model finishes generating before returning anything. For chat apps or long responses, that means several seconds of silence before the user sees a word. Streaming changes that: each text token is sent to your client as soon as it is generated, so output appears progressively. This guide covers the Anthropic Python SDK v0.34+ patterns — sync context manager, raw SSE events, async, tool use, and error handling.
The 30-second answer
- Recommended pattern: use the context manager
client.messages.stream()and iteratestream.text_stream— handles cleanup automatically. - After the stream: call
stream.get_final_message()to get usage stats and the fullMessageobject. - For async frameworks (FastAPI): use
anthropic.AsyncAnthropic()withasync withandasync for. - Raw events: use
client.messages.create(..., stream=True)if you need to inspect individual SSE event types (e.g. to handle tool use blocks).
Why streaming matters
Without streaming, a 500-token Claude response at normal generation speed takes roughly 5–10 seconds before your user sees anything. With streaming, the first token arrives in under a second in most cases, and the text scrolls into view as it is generated. Beyond UX, streaming lets you begin processing output — parsing structured data, piping to TTS, detecting early stop conditions — before the full response is ready.
The Anthropic Python SDK exposes two streaming APIs: a high-level context manager (client.messages.stream()) and a lower-level raw event stream (client.messages.create(..., stream=True)). For most use cases, the context manager is the right choice.
Basic streaming with the context manager (recommended)
The context manager pattern is the clearest way to stream. The with block ensures the underlying HTTP connection is always closed, even if your loop raises an exception.
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short poem about APIs."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# After the stream closes, get the full Message with usage stats
message = stream.get_final_message()
print(f"\n\nInput tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")
stream.text_stream is a generator that yields text delta strings — only the new characters added by each token event. The end="" and flush=True arguments to print() ensure output appears immediately without newlines between chunks. stream.get_final_message() is safe to call after the with block exits — the final message is assembled from the event stream as it closes.
Raw SSE events and event types
If you need to handle every event type explicitly — for example, to detect tool use blocks or log timing data — use the lower-level raw stream. Pass stream=True to client.messages.create():
import anthropic
client = anthropic.Anthropic()
with client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain rate limiting in one paragraph."}],
stream=True,
) as stream:
for event in stream:
print(event.type, event)
The Claude API sends six distinct event types over the SSE connection:
- message_start — sent once at the beginning; contains the initial
Messageobject (with empty content and usage placeholders). - content_block_start — marks the beginning of a new content block (type
textortool_use). - content_block_delta — carries the actual data increment:
text_deltafor text blocks (the characters to display) orinput_json_deltafor tool use blocks (partial JSON to accumulate). - content_block_stop — signals a content block is complete.
- message_delta — sent near the end; carries the
stop_reason(e.g.end_turn,tool_use,max_tokens) and final output token usage. - message_stop — final event; the stream is finished.
For text-only use cases, you only need to handle content_block_delta events where delta.type == "text_delta". The high-level context manager does this filtering for you automatically.
Async streaming for FastAPI and asyncio apps
If your application uses an async framework — FastAPI, aiohttp, Starlette — use anthropic.AsyncAnthropic(). The interface mirrors the sync version exactly, but with async with and async for:
import anthropic
import asyncio
async def stream_response():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
message = await stream.get_final_message()
print(f"\nTokens used: {message.usage.output_tokens}")
asyncio.run(stream_response())
In a FastAPI route, yield chunks from the async stream into a StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.get("/stream")
async def stream_claude():
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": "Tell me something interesting."}],
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(generate(), media_type="text/plain")
Streaming with tool use
When the model decides to call a tool mid-generation, the stream transitions from text output to a tool use block. You will see a content_block_start event with content_block.type == "tool_use", followed by a sequence of content_block_delta events where delta.type == "input_json_delta". Each input_json_delta carries a partial JSON string fragment that you must accumulate and parse after content_block_stop.
import json
import anthropic
client = anthropic.Anthropic()
tool_input_parts = []
current_block_type = None
with client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
tools=[{
"name": "get_weather",
"description": "Get current weather for a city.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}],
messages=[{"role": "user", "content": "What is the weather in Paris?"}],
stream=True,
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block_type = event.content_block.type
elif event.type == "content_block_delta":
if current_block_type == "tool_use":
tool_input_parts.append(event.delta.partial_json)
elif current_block_type == "text":
print(event.delta.text, end="", flush=True)
if tool_input_parts:
tool_input = json.loads("".join(tool_input_parts))
print(f"\nTool called with input: {tool_input}")
Note that message_delta.stop_reason will be "tool_use" rather than "end_turn" when the model ends its turn by calling a tool. The high-level stream.text_stream generator skips tool use events and only yields text — use the raw event loop when handling tools.
Error handling during streaming
Errors from the Claude API can arrive in two places. Before streaming starts — a 429 rate limit or 529 overloaded error — the SDK raises an anthropic.RateLimitError or anthropic.APIStatusError before entering the stream loop, so a standard try/except around the with block catches them. Once streaming has begun, mid-stream errors are less common but can occur if the connection drops or the server encounters a problem; the SDK raises an exception from inside the generator in that case.
import anthropic
client = anthropic.Anthropic()
try:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.RateLimitError as e:
print(f"Rate limited (429): {e}. Back off and retry.")
except anthropic.APIStatusError as e:
if e.status_code == 529:
print("Claude is overloaded. Retry with exponential backoff.")
else:
print(f"API error {e.status_code}: {e.message}")
except anthropic.APIConnectionError as e:
print(f"Connection lost during stream: {e}")
For production workloads, implement exponential backoff with jitter on 429 and 529 responses. The SDK does not retry automatically on streaming requests.
FAQ
- How do I stream a Claude API response in Python?
- Use the context manager pattern:
with client.messages.stream(...) as stream:and iterate overstream.text_streamto get text deltas as they arrive. After the stream closes, callstream.get_final_message()to get the full Message object with usage statistics. - What is the difference between
stream=Trueandclient.messages.stream()? client.messages.stream()is the high-level context manager — it handles cleanup automatically and exposesstream.text_streamfor easy iteration.client.messages.create(..., stream=True)is the lower-level approach that gives you rawMessageStreamEventobjects, useful when you need to handle all event types explicitly (message_start, content_block_delta, etc.).- Can I use Claude API streaming with FastAPI or other async frameworks?
- Yes. Use
anthropic.AsyncAnthropic()and the async context manager:async with client.messages.stream(...) as stream:withasync for text in stream.text_stream:. This integrates directly with FastAPI'sStreamingResponseor any asyncio-based framework.
Last updated May 28, 2026. Code examples verified against Anthropic Python SDK v0.34+ and the Anthropic streaming API documentation.