OpenAI API streaming Python: stream GPT responses token-by-token
Without streaming, your code waits for the entire GPT response before displaying anything — for a long answer, that can mean 10+ seconds of nothing. The OpenAI API supports server-sent events (SSE) streaming: pass stream=True and the model sends tokens as they are generated. This guide covers the OpenAI Python SDK v1.x patterns — basic streaming, the context manager form, async for FastAPI, tool call accumulation, and getting usage stats (which are not included by default).
The 30-second answer
- Enable streaming: add
stream=Truetoclient.chat.completions.create(). - Read text chunks: access
chunk.choices[0].delta.contenton each iteration — check forNonebefore printing. - Usage stats are off by default: add
stream_options={"include_usage": True}to get a final chunk with token counts. - Async: use
AsyncOpenAI()withasync with await ... as stream:andasync for chunk in stream:.
Basic streaming
The minimal streaming pattern: pass stream=True, iterate over the result, and read chunk.choices[0].delta.content. The delta content is None on non-text chunks (the first and last chunks, and tool call chunks), so always guard against it:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a short poem about APIs."}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Each chunk is a ChatCompletionChunk object. The text you want is at chunk.choices[0].delta.content. The delta is additive — each string is a new fragment to append to what you have already received, not a replacement. The final chunk has chunk.choices[0].finish_reason set to "stop" (or "length", "tool_calls", etc.) and delta.content is None.
Context manager pattern (cleaner resource handling)
Using the stream as a context manager ensures the underlying HTTP connection is closed even if your loop raises an exception:
from openai import OpenAI
client = OpenAI()
with client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
) as stream:
for chunk in stream:
if not chunk.choices:
# Final usage chunk when stream_options={"include_usage": True}
continue
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True)
The if not chunk.choices: guard handles the edge case that arises when you enable usage stats (see below) — the final chunk has an empty choices list. Adding this guard now makes the code forward-compatible.
Async streaming for FastAPI and asyncio apps
For async applications, import AsyncOpenAI. The create() call with stream=True is awaitable and returns an async context manager:
from openai import AsyncOpenAI
import asyncio
async def stream():
client = AsyncOpenAI()
async with await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
) as stream:
async for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True)
asyncio.run(stream())
In a FastAPI route, yield chunks into a StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.get("/stream")
async def stream_gpt():
async def generate():
async with await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Tell me something interesting."}],
stream=True,
) as stream:
async for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta.content
if delta:
yield delta
return StreamingResponse(generate(), media_type="text/plain")
Streaming with tool and function calls
When the model decides to call a tool, finish_reason becomes "tool_calls" and the delta carries incremental tool call data in delta.tool_calls rather than text in delta.content. Each tool_calls item has a function.arguments field that arrives as a partial JSON string fragment — you must accumulate these across chunks and parse the complete JSON after the stream ends.
from openai import OpenAI
import json
client = OpenAI()
tool_calls_buffer = {}
with client.chat.completions.create(
model="gpt-4o",
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a city.",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}],
messages=[{"role": "user", "content": "What is the weather in Tokyo?"}],
stream=True,
) as stream:
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
if delta.content:
print(delta.content, end="", flush=True)
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_calls_buffer:
tool_calls_buffer[idx] = {"name": "", "arguments": ""}
if tc.function.name:
tool_calls_buffer[idx]["name"] += tc.function.name
if tc.function.arguments:
tool_calls_buffer[idx]["arguments"] += tc.function.arguments
if finish_reason == "tool_calls":
for idx, tc in tool_calls_buffer.items():
args = json.loads(tc["arguments"])
print(f"\nTool: {tc['name']}, Args: {args}")
The key point: function.arguments is a partial JSON string in each chunk. Concatenate all fragments indexed by tc.index (which identifies which tool call the chunk belongs to when the model calls multiple tools at once), then parse the full string at the end.
Getting usage stats while streaming
By default, token usage data is not included in streaming responses. To get it, add stream_options={"include_usage": True} to your create() call. This appends one final chunk to the stream where choices is an empty list and usage is populated:
from openai import OpenAI
client = OpenAI()
usage_data = None
with client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain REST APIs briefly."}],
stream=True,
stream_options={"include_usage": True},
) as stream:
for chunk in stream:
if not chunk.choices:
# This is the final usage chunk
usage_data = chunk.usage
continue
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True)
if usage_data:
print(f"\n\nPrompt tokens: {usage_data.prompt_tokens}")
print(f"Completion tokens: {usage_data.completion_tokens}")
print(f"Total tokens: {usage_data.total_tokens}")
The critical detail: when stream_options={"include_usage": True} is set, the last chunk has choices=[]. Accessing chunk.choices[0] on this chunk raises an IndexError. The if not chunk.choices: continue pattern shown here prevents that. Capture chunk.usage on that same branch to retain the stats.
FAQ
- How do I stream an OpenAI API response in Python?
- Pass
stream=Truetoclient.chat.completions.create(). Iterate over the returned stream and readchunk.choices[0].delta.contentfor each text chunk, checking forNonebefore printing. The final chunk haschoices[0].finish_reasonset to"stop"(or another stop reason). - How do I get token usage stats when streaming the OpenAI API?
- By default, usage is not included in streaming responses. Add
stream_options={"include_usage": True}to yourcreate()call. This adds a final chunk wherechoicesis an empty list andusagecontains the prompt and completion token counts. Handle the emptychoiceslist in your loop to avoid anIndexError. - Can I stream OpenAI API responses in FastAPI?
- Yes. Use
AsyncOpenAI()and iterate withasync for. In a FastAPI route, wrap the async generator inStreamingResponsewithmedia_type="text/plain"(or"text/event-stream"for SSE). Useasync with await client.chat.completions.create(..., stream=True) as stream:andasync for chunk in stream:to yield chunks.
Last updated May 28, 2026. Code examples verified against OpenAI Python SDK v1.x and the OpenAI streaming API documentation.